Skip to content

Use binary search to improve DimensionRangeShardSpec lookup#12417

Merged
kfaraz merged 4 commits intoapache:masterfrom
hqx871:opt_multi_range_lookup
Apr 15, 2022
Merged

Use binary search to improve DimensionRangeShardSpec lookup#12417
kfaraz merged 4 commits intoapache:masterfrom
hqx871:opt_multi_range_lookup

Conversation

@hqx871
Copy link
Copy Markdown
Contributor

@hqx871 hqx871 commented Apr 9, 2022

I tried to use binary search to improve DimensionRangeShard lookup, detail is discussed at #11848. Hope this is useful to somebody else.

Description

I have implement hadoop based batch ingestion with range shard spec, by referring the native batch ingestion and hadoop based single_dim ingestion.
When I use my hadoop based batch ingestion to load about 70 billion rows, I found the mapper of IndexGeneratorJob will cost more than 11 hours! I checked and found that the most time is spended on calling DimensionRangeShardSpec.isInChunk to lookup target shard, as there are about 6000 shards. Using binary search instead of comparing the row to every shardSpec to decide its shard obviously improve this, the mapper cost only 30 minutes now!


Key changed/added classes in this PR
  • BaseDimensionRangeShardSpec
  • DimensionRangeShardSpec
  • SingleDimensionShardSpec
  • DimensionRangeBucketShardSpec

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@hqx871 hqx871 force-pushed the opt_multi_range_lookup branch from 745cf67 to 2eff273 Compare April 9, 2022 04:38
@FrankChen021 FrankChen021 requested a review from kfaraz April 12, 2022 02:05
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @hqx871 .
The code looks much cleaner now.

Requested minor changes.
Could you maybe also update the PR description with the perf impact of these changes that you found while testing?

Comment on lines +80 to +94
return (long timestamp, InputRow row) -> {
StringTuple inputRowTuple = getInputRowTuple(dimensions, row);
int startIndex = 0;
int endIndex = shardSpecs.size() - 1;
while (startIndex <= endIndex) {
int mid = (startIndex + endIndex) >>> 1;
BaseDimensionRangeShardSpec rangeShardSpec = rangeShardSpecs[mid];
if (startComparator.compare(inputRowTuple, rangeShardSpec.start) < 0) {
endIndex = mid - 1;
} else if (endComparator.compare(inputRowTuple, rangeShardSpec.end) < 0) {
return rangeShardSpec;
} else {
startIndex = mid + 1;
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably simplify this using Arrays.binarySearch(array, key, comparator)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The array component is BaseDimensionRangeShardSpec while the key is StringTuple, so I can not directly call Arrays.binarySearch

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use a dummy key. It would still be cleaner than writing the binary search logic yourself.
Something like this:

final StringTuple searchTuple = getInputRowTuple(dimensions, row);
final BaseDimensionRangeShardSpec searchKey = new DimensionRangeShardSpec(dimensions, searchTuple, searchTuple, 0, 1);
final int searchResult = Arrays.binarySearch(rangeShardSpecs, searchKey, shardSpecComparator);
if (searchResult < 0) {
   throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
} else {
    return rangeShardSpecs[searchResult];
}

Please let me know if this seems cleaner.

Copy link
Copy Markdown
Contributor Author

@hqx871 hqx871 Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrays.binarySearch requires the searchKey to equals one array element, while actually we do need find one that equals or contains the searchKey. For example:

input:

searchKey: ["a","a"]
shardSpecs:[[null,"c"], ["c", "h"],["h",null]]

then the expect result is [null, "c"], but the Arrays.binarySearch will return -2, not 0.

Comment on lines +64 to +77
final Comparator<StringTuple> startComparator = Comparators.naturalNullsFirst();
final Comparator<StringTuple> endComparator = Ordering.natural().nullsLast();
final Comparator<BaseDimensionRangeShardSpec> shardSpecComparator = new Comparator<BaseDimensionRangeShardSpec>()
{
@Override
public int compare(BaseDimensionRangeShardSpec o1, BaseDimensionRangeShardSpec o2)
{
int startComparison = startComparator.compare(o1.start, o2.start);
if (startComparison != 0) {
return startComparison;
}
return endComparator.compare(o1.end, o2.end);
}
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The start comparator is null first while the end comparator is null last. I do not know how to chaining the comparator

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this:

    final Comparator<BaseDimensionRangeShardSpec> shardSpecComparator = Comparator
        .comparing((BaseDimensionRangeShardSpec spec) -> spec.start, Comparators.naturalNullsFirst())
        .thenComparing(spec -> spec.end, Ordering.natural().nullsLast());

    Arrays.sort(rangeShardSpecs, shardSpecComparator);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

Comment on lines +60 to +63
BaseDimensionRangeShardSpec[] rangeShardSpecs = new BaseDimensionRangeShardSpec[shardSpecs.size()];
for (int i = 0; i < shardSpecs.size(); i++) {
rangeShardSpecs[i] = (BaseDimensionRangeShardSpec) shardSpecs.get(i);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: You could use shardSpecs.toArray() for cleaner code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly call toArray cause compile error as the component is not BaseDimensionRangeShardSpec

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense. I had missed the cast.

{
final String[] inputDimensionValues = new String[dimensions.size()];
for (int i = 0; i < dimensions.size(); ++i) {
List<String> values = inputRow.getDimension(dimensions.get(i));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please add the comment originally present in this method.
// Get the values of this dimension, treat multiple values as null

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added back

return isInChunk(dimensions, start, end, inputRow);
}

public static boolean isInChunk(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably remove this method. I don't think it is needed anymore.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@hqx871 hqx871 force-pushed the opt_multi_range_lookup branch from da954eb to 506312f Compare April 12, 2022 06:30
@hqx871
Copy link
Copy Markdown
Contributor Author

hqx871 commented Apr 14, 2022

Thanks for you time @kfaraz to review my code

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look good, @hqx871 .
Only tests seem to be missing, sorry I didn't take a thorough pass at the tests in my first review. Please fix up the tests to verify the new code.

private final List<String> dimensions = new ArrayList<>();

@Test
public void testIsInChunk()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't remove these tests altogether as there is nothing else testing this behaviour.
You can convert these tests to test getLookup instead.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could create multiple shard specs, representing adjacent (or non-adjacent too if you are testing for it) partitions. Then create a ShardSpecLookup using the getLookup method. Then convert the assertions accordingly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines -59 to -62
assertTrue(isInChunk(
shardSpec,
createRow("India", "Delhi")
));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines (and all the other assertions) could be translated to something like:

assertEquals(
   shard1,
   lookup.getShardSpec(timestamp, createRow("India", "Delhi"))
)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🚀

@kfaraz kfaraz merged commit a22d413 into apache:master Apr 15, 2022
@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Apr 15, 2022

I have implement hadoop based batch ingestion with range shard spec, by referring the native batch ingestion and hadoop based single_dim ingestion.

@hqx871 , do you mean you have implemented range partitioning for hadoop-based ingestion?
If yes, it would be great if you would consider contributing those changes here. :)
It might be very useful for other people using hadoop-based ingestion.

@hqx871
Copy link
Copy Markdown
Contributor Author

hqx871 commented Apr 16, 2022

Hi @kfaraz, I will contribute it soon

@abhishekagarwal87 abhishekagarwal87 added this to the 0.23.0 milestone May 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants