Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
edafd16
Fill in the core partition set size properly for batch ingestion with
jihoonson Jun 6, 2020
771eec1
Merge branch 'master' of github.com:apache/druid into dynamic-partiti…
jihoonson Jun 10, 2020
f1aeddb
incomplete javadoc
jihoonson Jun 10, 2020
42e37e2
Address comments
jihoonson Jun 10, 2020
d5e197e
fix tests
jihoonson Jun 11, 2020
99842f9
fix json serde, add tests
jihoonson Jun 11, 2020
b2222b9
checkstyle
jihoonson Jun 11, 2020
f8ac529
Set core partition set size for hash-partitioned segments properly in
jihoonson Jun 11, 2020
4600bd6
test for both parallel and single-threaded task
jihoonson Jun 11, 2020
7448773
unused variables
jihoonson Jun 11, 2020
08e2635
fix test
jihoonson Jun 12, 2020
30cae0e
unused imports
jihoonson Jun 12, 2020
5664a6c
add hash/range buckets
jihoonson Jun 13, 2020
f2c9bf3
some test adjustment and missing json serde
jihoonson Jun 13, 2020
af2b695
centralized partition id allocation in parallel and simple tasks
jihoonson Jun 13, 2020
209ad92
Merge branch 'master' of github.com:apache/druid into hash-partition-…
jihoonson Jun 13, 2020
ae87c94
remove string partition chunk
jihoonson Jun 13, 2020
dbe705a
revive string partition chunk
jihoonson Jun 13, 2020
71e2324
fill numCorePartitions for hadoop
jihoonson Jun 13, 2020
d6655af
clean up hash stuffs
jihoonson Jun 13, 2020
7d0d3b9
resolved todos
jihoonson Jun 13, 2020
6cff5b8
javadocs
jihoonson Jun 13, 2020
07a9e1b
Fix tests
jihoonson Jun 13, 2020
6e0ac6e
add more tests
jihoonson Jun 14, 2020
4a27771
doc
jihoonson Jun 14, 2020
7eaed9b
unused imports
jihoonson Jun 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package org.apache.druid.segment.loading;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -105,6 +107,13 @@ default List<String> getAllowedPropertyPrefixesForHadoop()
// on segment deletion if segment being deleted was the only segment
static String getDefaultStorageDir(DataSegment segment, boolean useUniquePath)
{
// Sanity check for shardSpec type.
// BucketNumberedShardSpec should never be used in segment push.
Preconditions.checkArgument(
!(segment.getShardSpec() instanceof BucketNumberedShardSpec),
"Illegal shardSpec type[%s]",
segment.getShardSpec()
);
return JOINER.join(
segment.getDataSource(),
StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline.partition;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.RangeSet;

import java.util.List;
import java.util.Map;

/**
* This is one of the special shardSpecs which are temporarily used during batch ingestion. In Druid, there is a
* concept of core partition set which is a set of segments atomically becoming queryable together in Brokers. The core
* partition set is represented as a range of partitionIds, i.e., [0, {@link ShardSpec#getNumCorePartitions()}).
*
* When you run a batch ingestion task with a non-linear partitioning scheme, the task populates all possible buckets
* upfront at the beginning (see {@code CachingLocalSegmentAllocator}) and uses them to partition input rows. However,
* some of the buckets can be empty even after the task consumes all inputs if the data is highly skewed. Since Druid
* doesn't create empty segments, the partitionId should be dynamically allocated when a bucket is actually in use,
* so that we can always create the packed core partition set without missing partitionIds.
*
* This BucketNumberedShardSpec is used for such use case. The task with a non-linear partitioning scheme uses it
* to postpone the partitionId allocation until all empty buckets are identified. See
* {@code ParallelIndexSupervisorTask.groupGenericPartitionLocationsPerPartition} and
* {@code CachingLocalSegmentAllocator} for parallel and sequential ingestion, respectively.
*
* Note that {@link org.apache.druid.timeline.SegmentId} requires the partitionId. Since the segmentId is used
* everwhere during ingestion, this class should implement {@link #getPartitionNum()} which returns the bucketId.
* This should be fine because the segmentId is only used to identify each segment until pushing them to deep storage.
* The bucketId should be enough to uniquely identify each segment. However, when pushing segments to deep storage,
* the partitionId is used to create the path to store the segment on deep storage
* ({@link org.apache.druid.segment.loading.DataSegmentPusher#getDefaultStorageDir} which should be correct.
* As a result, this shardSpec should not be used in pushing segments.
*
* This class should be Jackson-serializable as the subtasks can send it to the parallel task in parallel ingestion.
*
* This interface doesn't really have to extend {@link ShardSpec}. The only reason is the ShardSpec is used in many
* places such as {@link org.apache.druid.timeline.DataSegment}, and we have to modify those places to allow other
* types than ShardSpec which seems pretty invasive. Maybe we could clean up this mess someday in the future.
*
* @see BuildingShardSpec
*/
public interface BucketNumberedShardSpec<T extends BuildingShardSpec> extends ShardSpec
{
int getBucketId();

T convert(int partitionId);

@Override
default <O> PartitionChunk<O> createChunk(O obj)
{
// The partitionId (or partitionNum, chunkNumber) is not determined yet. Use bucketId for now.
return new NumberedPartitionChunk<>(getBucketId(), 0, obj);
}

@Override
default int getPartitionNum()
{
// See the class-level Javadoc for returning bucketId here.
return getBucketId();
}

@Override
default int getNumCorePartitions()
{
throw new UnsupportedOperationException();
}

// The below methods are used on the query side, and so must not be called for this shardSpec.

@JsonIgnore
@Override
default List<String> getDomainDimensions()
{
throw new UnsupportedOperationException();
}

@Override
default boolean possibleInDomain(Map<String, RangeSet<String>> domain)
{
throw new UnsupportedOperationException();
}

@Override
default boolean isCompatible(Class<? extends ShardSpec> other)
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline.partition;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;

/**
* See {@link BuildingShardSpec} for how this class is used.
*
* @see HashBasedNumberedShardSpec
*/
public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec<HashBasedNumberedShardSpec>
{
public static final String TYPE = "building_hashed";

private final int partitionId;
private final int bucketId;
private final int numBuckets;
private final List<String> partitionDimensions;
private final ObjectMapper jsonMapper;

@JsonCreator
public BuildingHashBasedNumberedShardSpec(
@JsonProperty("partitionId") int partitionId,
@JsonProperty("bucketId") int bucketId,
@JsonProperty("numBuckets") int numBuckets,
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
@JacksonInject ObjectMapper jsonMapper
)
{
this.partitionId = partitionId;
this.bucketId = bucketId;
this.numBuckets = numBuckets;
this.partitionDimensions = partitionDimensions == null
? HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS
: partitionDimensions;
this.jsonMapper = jsonMapper;
}

@JsonProperty("partitionId")
@Override
public int getPartitionNum()
{
return partitionId;
}

@Override
@JsonProperty
public int getBucketId()
{
return bucketId;
}

@JsonProperty
public int getNumBuckets()
{
return numBuckets;
}

@JsonProperty
public List<String> getPartitionDimensions()
{
return partitionDimensions;
}

@Override
public <T> PartitionChunk<T> createChunk(T obj)
{
// This method can be called in AppenderatorImpl to create a sinkTimeline.
// The sinkTimeline doesn't seem in use in batch ingestion, let's set 'chunks' to 0 for now.
// HashBasedNumberedShardSpec is using NumberedPartitionChunk, so we use it here too.
return new NumberedPartitionChunk<>(partitionId, 0, obj);
}

@Override
public HashBasedNumberedShardSpec convert(int numCorePartitions)
{
return new HashBasedNumberedShardSpec(
partitionId,
numCorePartitions,
bucketId,
numBuckets,
partitionDimensions,
jsonMapper
);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BuildingHashBasedNumberedShardSpec that = (BuildingHashBasedNumberedShardSpec) o;
return partitionId == that.partitionId &&
bucketId == that.bucketId &&
numBuckets == that.numBuckets &&
Objects.equals(partitionDimensions, that.partitionDimensions);
}

@Override
public int hashCode()
{
return Objects.hash(partitionId, bucketId, numBuckets, partitionDimensions);
}

@Override
public String toString()
{
return "BuildingHashBasedNumberedShardSpec{" +
"partitionId=" + partitionId +
", bucketId=" + bucketId +
", numBuckets=" + numBuckets +
", partitionDimensions=" + partitionDimensions +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,20 @@
package org.apache.druid.timeline.partition;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.RangeSet;
import org.apache.druid.data.input.InputRow;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* This is a special shardSpec which is temporarily used during batch ingestion. In Druid, there is a concept
* of core partition set which is a set of segments atomically becoming queryable together in Brokers. The core
* partition set is represented as a range of partitionIds. For {@link NumberedShardSpec}, the core partition set
* is [0, {@link NumberedShardSpec#partitions}).
* See {@link BuildingShardSpec} for how this class is used.
*
* The NumberedShardSpec is used for dynamic partitioning which is based on the number of rows in each segment.
* In streaming ingestion, the core partition set size cannot be determined since it's impossible to know how many
* segments will be created per time chunk. However, in batch ingestion with time chunk locking, the core partition
* set is the set of segments created by an initial task or an overwriting task. Since the core partition set is
* determined when the task publishes segments at the end, the task postpones creating proper NumberedShardSpec
* until the end.
*
* This shardSpec is used for such use case. A non-appending batch task can use this shardSpec until it publishes
* segments at last. When it publishes segments, it should convert the shardSpec of those segments to NumberedShardSpec.
* See {@code SegmentPublisherHelper#annotateShardSpec} for converting to NumberedShardSpec. Note that, when
* the segment lock is used, the Overlord coordinates the segment allocation and this class is never used. Instead,
* the task sends {@link PartialShardSpec} to the Overlord to allocate a new segment. The result segment could have
* either a {@link ShardSpec} (for root generation segments) or an {@link OverwriteShardSpec} (for non-root
* generation segments).
*
* This class should be Jackson-serializable as the subtasks can send it to the parallel task in parallel ingestion.
*
* Finally, this shardSpec has only partitionId which is same as {@link LinearShardSpec}. The difference between
* This shardSpec has only partitionId which is same as {@link LinearShardSpec}. The difference between
* them is this shardSpec should never be published and so never be used in other places such as Broker timeline.
*
* @see NumberedShardSpec
*/
public class BuildingNumberedShardSpec implements ShardSpec
public class BuildingNumberedShardSpec implements BuildingShardSpec<NumberedShardSpec>
{
public static final String TYPE = "building_numbered";

Expand All @@ -71,7 +46,15 @@ public BuildingNumberedShardSpec(@JsonProperty("partitionId") int partitionId)
this.partitionId = partitionId;
}

public NumberedShardSpec toNumberedShardSpec(int numTotalPartitions)
@Override
public int getBucketId()
{
// This method is currently not called when the shardSpec type is this class.
throw new UnsupportedOperationException();
}

@Override
public NumberedShardSpec convert(int numTotalPartitions)
{
return new NumberedShardSpec(partitionId, numTotalPartitions);
}
Expand All @@ -91,39 +74,6 @@ public int getPartitionNum()
return partitionId;
}

@Override
public ShardSpecLookup getLookup(List<ShardSpec> shardSpecs)
{
return NumberedShardSpec.createLookup(shardSpecs);
}

// The below methods are used on the query side, and so must not be called for this shardSpec.

@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
throw new UnsupportedOperationException();
}

@JsonIgnore
@Override
public List<String> getDomainDimensions()
{
throw new UnsupportedOperationException();
}

@Override
public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
{
throw new UnsupportedOperationException();
}

@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
throw new UnsupportedOperationException();
}

@Override
public boolean equals(Object o)
{
Expand Down
Loading