Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline.partition;

import com.google.common.collect.Ordering;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Comparators;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;

public abstract class BaseDimensionRangeShardSpec implements ShardSpec
{
protected final List<String> dimensions;
@Nullable
protected final StringTuple start;
@Nullable
protected final StringTuple end;

protected BaseDimensionRangeShardSpec(
List<String> dimensions,
@Nullable StringTuple start,
@Nullable StringTuple end
)
{
this.dimensions = dimensions;
this.start = start;
this.end = end;
}

@Override
public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
{
return createLookup(dimensions, shardSpecs);
}

private static ShardSpecLookup createLookup(List<String> dimensions, List<? extends ShardSpec> shardSpecs)
{
BaseDimensionRangeShardSpec[] rangeShardSpecs = new BaseDimensionRangeShardSpec[shardSpecs.size()];
for (int i = 0; i < shardSpecs.size(); i++) {
rangeShardSpecs[i] = (BaseDimensionRangeShardSpec) shardSpecs.get(i);
}
Comment on lines +60 to +63
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: You could use shardSpecs.toArray() for cleaner code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly call toArray cause compile error as the component is not BaseDimensionRangeShardSpec

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense. I had missed the cast.

final Comparator<StringTuple> startComparator = Comparators.naturalNullsFirst();
final Comparator<StringTuple> endComparator = Ordering.natural().nullsLast();

final Comparator<BaseDimensionRangeShardSpec> shardSpecComparator = Comparator
.comparing((BaseDimensionRangeShardSpec spec) -> spec.start, startComparator)
.thenComparing(spec -> spec.end, endComparator);

Arrays.sort(rangeShardSpecs, shardSpecComparator);

return (long timestamp, InputRow row) -> {
StringTuple inputRowTuple = getInputRowTuple(dimensions, row);
int startIndex = 0;
int endIndex = shardSpecs.size() - 1;
while (startIndex <= endIndex) {
int mid = (startIndex + endIndex) >>> 1;
BaseDimensionRangeShardSpec rangeShardSpec = rangeShardSpecs[mid];
if (startComparator.compare(inputRowTuple, rangeShardSpec.start) < 0) {
endIndex = mid - 1;
} else if (endComparator.compare(inputRowTuple, rangeShardSpec.end) < 0) {
return rangeShardSpec;
} else {
startIndex = mid + 1;
}
}
Comment on lines +73 to +87
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably simplify this using Arrays.binarySearch(array, key, comparator)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The array component is BaseDimensionRangeShardSpec while the key is StringTuple, so I can not directly call Arrays.binarySearch

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use a dummy key. It would still be cleaner than writing the binary search logic yourself.
Something like this:

final StringTuple searchTuple = getInputRowTuple(dimensions, row);
final BaseDimensionRangeShardSpec searchKey = new DimensionRangeShardSpec(dimensions, searchTuple, searchTuple, 0, 1);
final int searchResult = Arrays.binarySearch(rangeShardSpecs, searchKey, shardSpecComparator);
if (searchResult < 0) {
   throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
} else {
    return rangeShardSpecs[searchResult];
}

Please let me know if this seems cleaner.

Copy link
Copy Markdown
Contributor Author

@hqx871 hqx871 Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrays.binarySearch requires the searchKey to equals one array element, while actually we do need find one that equals or contains the searchKey. For example:

input:

searchKey: ["a","a"]
shardSpecs:[[null,"c"], ["c", "h"],["h",null]]

then the expect result is [null, "c"], but the Arrays.binarySearch will return -2, not 0.

throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}

protected static StringTuple getInputRowTuple(List<String> dimensions, InputRow inputRow)
{
final String[] inputDimensionValues = new String[dimensions.size()];
for (int i = 0; i < dimensions.size(); ++i) {
// Get the values of this dimension, treat multiple values as null
List<String> values = inputRow.getDimension(dimensions.get(i));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please add the comment originally present in this method.
// Get the values of this dimension, treat multiple values as null

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added back

inputDimensionValues[i] = values != null && values.size() == 1 ? values.get(0) : null;
}

return StringTuple.create(inputDimensionValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.ISE;

import javax.annotation.Nullable;
import java.util.List;
Expand All @@ -40,14 +38,10 @@
* @see BuildingSingleDimensionShardSpec
* @see BuildingDimensionRangeShardSpec
*/
public class DimensionRangeBucketShardSpec implements BucketNumberedShardSpec<BuildingDimensionRangeShardSpec>
public class DimensionRangeBucketShardSpec extends BaseDimensionRangeShardSpec
implements BucketNumberedShardSpec<BuildingDimensionRangeShardSpec>
{
private final int bucketId;
private final List<String> dimensions;
@Nullable
private final StringTuple start;
@Nullable
private final StringTuple end;

@JsonCreator
public DimensionRangeBucketShardSpec(
Expand All @@ -57,6 +51,7 @@ public DimensionRangeBucketShardSpec(
@JsonProperty("end") @Nullable StringTuple end
)
{
super(dimensions, start, end);
// Verify that the tuple sizes and number of dimensions are the same
Preconditions.checkArgument(
start == null || start.size() == dimensions.size(),
Expand All @@ -68,9 +63,6 @@ public DimensionRangeBucketShardSpec(
);

this.bucketId = bucketId;
this.dimensions = dimensions;
this.start = start;
this.end = end;
}

@Override
Expand Down Expand Up @@ -119,24 +111,6 @@ public BuildingDimensionRangeShardSpec convert(int partitionId)
);
}

@Override
public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> {
for (ShardSpec spec : shardSpecs) {
if (((DimensionRangeBucketShardSpec) spec).isInChunk(row)) {
return spec;
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}

private boolean isInChunk(InputRow inputRow)
{
return DimensionRangeShardSpec.isInChunk(dimensions, start, end, inputRow);
}

@Override
public String getType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.ISE;

import javax.annotation.Nullable;
import java.util.Collections;
Expand All @@ -38,15 +36,10 @@
/**
* {@link ShardSpec} for partitioning based on ranges of one or more dimensions.
*/
public class DimensionRangeShardSpec implements ShardSpec
public class DimensionRangeShardSpec extends BaseDimensionRangeShardSpec
{
public static final int UNKNOWN_NUM_CORE_PARTITIONS = -1;

private final List<String> dimensions;
@Nullable
private final StringTuple start;
@Nullable
private final StringTuple end;
private final int partitionNum;
private final int numCorePartitions;

Expand All @@ -65,15 +58,13 @@ public DimensionRangeShardSpec(
@JsonProperty("numCorePartitions") @Nullable Integer numCorePartitions // nullable for backward compatibility
)
{
super(dimensions, start, end);
Preconditions.checkArgument(partitionNum >= 0, "partitionNum >= 0");
Preconditions.checkArgument(
dimensions != null && !dimensions.isEmpty(),
"dimensions should be non-null and non-empty"
);

this.dimensions = dimensions;
this.start = start;
this.end = end;
this.partitionNum = partitionNum;
this.numCorePartitions = numCorePartitions == null ? UNKNOWN_NUM_CORE_PARTITIONS : numCorePartitions;
}
Expand Down Expand Up @@ -117,24 +108,6 @@ public boolean isNumCorePartitionsUnknown()
return numCorePartitions == UNKNOWN_NUM_CORE_PARTITIONS;
}

@Override
public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
{
return createLookup(shardSpecs);
}

private static ShardSpecLookup createLookup(List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> {
for (ShardSpec spec : shardSpecs) {
if (((DimensionRangeShardSpec) spec).isInChunk(row)) {
return spec;
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}

@Override
public List<String> getDomainDimensions()
{
Expand Down Expand Up @@ -279,33 +252,6 @@ public <T> PartitionChunk<T> createChunk(T obj)
}
}

private boolean isInChunk(InputRow inputRow)
{
return isInChunk(dimensions, start, end, inputRow);
}

public static boolean isInChunk(
List<String> dimensions,
@Nullable StringTuple start,
@Nullable StringTuple end,
InputRow inputRow
)
{
final String[] inputDimensionValues = new String[dimensions.size()];
for (int i = 0; i < dimensions.size(); ++i) {
// Get the values of this dimension, treat multiple values as null
List<String> values = inputRow.getDimension(dimensions.get(i));
inputDimensionValues[i] = values != null && values.size() == 1 ? values.get(0) : null;
}
final StringTuple inputRowTuple = StringTuple.create(inputDimensionValues);

int inputVsStart = inputRowTuple.compareTo(start);
int inputVsEnd = inputRowTuple.compareTo(end);

return (inputVsStart >= 0 || start == null)
&& (inputVsEnd < 0 || end == null);
}

@Override
public String getType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.data.input.StringTuple;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Collections;
import java.util.Objects;

/**
* See {@link BucketNumberedShardSpec} for how this class is used.
*
* @see BuildingSingleDimensionShardSpec
*/
public class SingleDimensionRangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSingleDimensionShardSpec>
public class SingleDimensionRangeBucketShardSpec extends BaseDimensionRangeShardSpec
implements BucketNumberedShardSpec<BuildingSingleDimensionShardSpec>
{
private final int bucketId;
private final String dimension;
Expand All @@ -50,6 +50,11 @@ public SingleDimensionRangeBucketShardSpec(
@JsonProperty("end") @Nullable String end
)
{
super(
dimension == null ? Collections.emptyList() : Collections.singletonList(dimension),
start == null ? null : StringTuple.create(start),
end == null ? null : StringTuple.create(end)
);
this.bucketId = bucketId;
this.dimension = dimension;
this.start = start;
Expand Down Expand Up @@ -89,24 +94,6 @@ public BuildingSingleDimensionShardSpec convert(int partitionId)
return new BuildingSingleDimensionShardSpec(bucketId, dimension, start, end, partitionId);
}

public boolean isInChunk(InputRow inputRow)
{
return SingleDimensionShardSpec.isInChunk(dimension, start, end, inputRow);
}

@Override
public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> {
for (ShardSpec spec : shardSpecs) {
if (((SingleDimensionRangeBucketShardSpec) spec).isInChunk(row)) {
return spec;
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}

@Override
public String getType()
{
Expand Down
Loading