Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline.partition;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.RangeSet;
import org.apache.druid.data.input.InputRow;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* This is a special shardSpec which is temporarily used during batch ingestion. In Druid, there is a concept
* of core partition set which is a set of segments atomically becoming queryable together in Brokers. The core
* partition set is represented as a range of partitionIds. For {@link NumberedShardSpec}, the core partition set
* is [0, {@link NumberedShardSpec#partitions}).
*
* The NumberedShardSpec is used for dynamic partitioning which is based on the number of rows in each segment.
* In streaming ingestion, the core partition set size cannot be determined since it's impossible to know how many
* segments will be created per time chunk. However, in batch ingestion with time chunk locking, the core partition
* set is the set of segments created by an initial task or an overwriting task. Since the core partition set is
* determined when the task publishes segments at the end, the task postpones creating proper NumberedShardSpec
* until the end.
*
* This shardSpec is used for such use case. A non-appending batch task can use this shardSpec until it publishes
* segments at last. When it publishes segments, it should convert the shardSpec of those segments to NumberedShardSpec.
* See {@code SegmentPublisherHelper#annotateShardSpec} for converting to NumberedShardSpec. Note that, when
* the segment lock is used, the Overlord coordinates the segment allocation and this class is never used. Instead,
* the task sends {@link PartialShardSpec} to the Overlord to allocate a new segment. The result segment could have
* either a {@link ShardSpec} (for root generation segments) or an {@link OverwriteShardSpec} (for non-root
* generation segments).
*
* This class should be Jackson-serializable as the subtasks can send it to the parallel task in parallel ingestion.
*
* Finally, this shardSpec has only partitionId which is same as {@link LinearShardSpec}. The difference between
* them is this shardSpec should never be published and so never be used in other places such as Broker timeline.
*
* @see NumberedShardSpec
*/
public class BuildingNumberedShardSpec implements ShardSpec
{
public static final String TYPE = "building_numbered";

private final int partitionId;

@JsonCreator
public BuildingNumberedShardSpec(@JsonProperty("partitionId") int partitionId)
{
Preconditions.checkArgument(partitionId >= 0, "partitionId >= 0");
this.partitionId = partitionId;
}

public NumberedShardSpec toNumberedShardSpec(int numTotalPartitions)
{
return new NumberedShardSpec(partitionId, numTotalPartitions);
}

@Override
public <T> PartitionChunk<T> createChunk(T obj)
{
// This method can be called in AppenderatorImpl to create a sinkTimeline.
// The sinkTimeline doesn't seem in use in batch ingestion, let's set 'chunks' to 0 for now.
return new NumberedPartitionChunk<>(partitionId, 0, obj);
}

@JsonProperty("partitionId")
@Override
public int getPartitionNum()
{
return partitionId;
}

@Override
public ShardSpecLookup getLookup(List<ShardSpec> shardSpecs)
{
return NumberedShardSpec.createLookup(shardSpecs);
}

// The below methods are used on the query side, and so must not be called for this shardSpec.

@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
throw new UnsupportedOperationException();
}

@JsonIgnore
@Override
public List<String> getDomainDimensions()
{
throw new UnsupportedOperationException();
}

@Override
public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
{
throw new UnsupportedOperationException();
}

@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
throw new UnsupportedOperationException();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BuildingNumberedShardSpec shardSpec = (BuildingNumberedShardSpec) o;
return partitionId == shardSpec.partitionId;
}

@Override
public int hashCode()
{
return Objects.hash(partitionId);
}

@Override
public String toString()
{
return "BuildingNumberedShardSpec{" +
"partitionId=" + partitionId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,36 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.RangeSet;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.timeline.DataSegment;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* ShardSpec for segments which overshadow others with their minorVersion.
* This shardSpec is used only for the segments created by overwriting tasks with segment lock enabled.
* When the segment lock is used, there is a concept of atomic update group which is a set of segments atomically
* becoming queryable together in Brokers. It is a similar concept to the core partition set (explained
* {@link NumberedShardSpec}), but different in a sense that there is only one core partition set per time chunk
* while there could be multiple atomic update groups in one time chunk.
*
* The atomic update group has the root partition range and the minor version to determine the visibility between
* atomic update groups; the group of the highest minor version in the same root partition range becomes queryable
* when they have the same major version ({@link DataSegment#getVersion()}).
*
* Note that this shardSpec is used only when you overwrite existing segments with segment lock enabled.
* If the task doesn't overwrite segments, it will use NumberedShardSpec instead even when segment lock is used.
* Similar to NumberedShardSpec, the size of the atomic update group is determined when the task publishes segments
* at the end of ingestion. As a result, {@link #atomicUpdateGroupSize} is set to
* {@link PartitionIds#UNKNOWN_ATOMIC_UPDATE_GROUP_SIZE} first, and updated when publishing segments
* in {@code SegmentPublisherHelper#annotateShardSpec}.
*
* @see AtomicUpdateGroup
*/
public class NumberedOverwriteShardSpec implements OverwriteShardSpec
{
public static final String TYPE = "numbered_overwrite";
private final int partitionId;

private final short startRootPartitionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ private NumberedPartialShardSpec()
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
{
if (specOfPreviousMaxPartitionId == null) {
// The shardSpec is created by the Overlord.
// - For streaming ingestion tasks, the core partition set is always 0.
// - For batch tasks, this code is executed only with segment locking (forceTimeChunkLock = false).
// In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of
// the same datasource. Since there is no restriction for those tasks in segment allocation, the
// allocated IDs for each task can interleave. As a result, the core partition set cannot be
// represented as a range. We always set 0 for the core partition set size.
return new NumberedShardSpec(0, 0);
} else {
final NumberedShardSpec prevSpec = (NumberedShardSpec) specOfPreviousMaxPartitionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public int getPartitionNum()

@Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{
return createLookup(shardSpecs);
}

static ShardSpecLookup createLookup(List<ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> shardSpecs.get(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
import javax.annotation.Nullable;

/**
* Class to contain all information of a {@link ShardSpec} except for the partition ID.
* This class is mainly used by the indexing tasks to allocate new segments using the Overlord.
* This interface is used in the segment allocation protocol when it is coordinated by the Overlord; when appending
* segments to an existing datasource (either streaming ingestion or batch append) or any case when segment
* lock is used. The implementations of this interface contain all information of the corresponding {@link ShardSpec}
* except the partition ID.
* The ingestion tasks send all information required for allocating a new segment using this interface and the Overlord
* determines the partition ID to create a new segment.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.timeline.partition;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.RangeSet;
Expand All @@ -38,53 +39,83 @@
@JsonSubTypes.Type(name = "linear", value = LinearShardSpec.class),
@JsonSubTypes.Type(name = "numbered", value = NumberedShardSpec.class),
@JsonSubTypes.Type(name = "hashed", value = HashBasedNumberedShardSpec.class),
@JsonSubTypes.Type(name = "numbered_overwrite", value = NumberedOverwriteShardSpec.class)
@JsonSubTypes.Type(name = NumberedOverwriteShardSpec.TYPE, value = NumberedOverwriteShardSpec.class),
@JsonSubTypes.Type(name = BuildingNumberedShardSpec.TYPE, value = BuildingNumberedShardSpec.class)
})
public interface ShardSpec
{
@JsonIgnore
<T> PartitionChunk<T> createChunk(T obj);

@JsonIgnore
boolean isInChunk(long timestamp, InputRow inputRow);

/**
* Returns the partition ID of this segment.
*/
int getPartitionNum();

/**
* Returns the start root partition ID of the atomic update group which this segment belongs to.
*
* @see AtomicUpdateGroup
*/
default int getStartRootPartitionId()
{
return getPartitionNum();
}

/**
* Returns the end root partition ID of the atomic update group which this segment belongs to.
*
* @see AtomicUpdateGroup
*/
default int getEndRootPartitionId()
{
return getPartitionNum() + 1;
}

/**
* Returns the minor version associated to the atomic update group which this segment belongs to.
*
* @see AtomicUpdateGroup
*/
default short getMinorVersion()
{
return 0;
}

/**
* Returns the atomic update group size which this segment belongs to.
*
* @see AtomicUpdateGroup
*/
default short getAtomicUpdateGroupSize()
{
return 1;
}

@JsonIgnore
ShardSpecLookup getLookup(List<ShardSpec> shardSpecs);

/**
* Get dimensions who have possible range for the rows this shard contains.
*
* @return list of dimensions who has its possible range. Dimensions with unknown possible range are not listed
*/
@JsonIgnore
List<String> getDomainDimensions();

/**
* if given domain ranges are not possible in this shard, return false; otherwise return true;
* @return possibility of in domain
*/
@JsonIgnore
boolean possibleInDomain(Map<String, RangeSet<String>> domain);

/**
* Returns true if two segments of this and other shardSpecs can exist in the same timeChunk.
* Returns true if two segments of this and other shardSpecs can exist in the same time chunk.
*/
@JsonIgnore
boolean isCompatible(Class<? extends ShardSpec> other);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,9 @@

public interface ShardSpecLookup
{
/**
* Returns a {@link ShardSpec} for the given timestamp and the inputRow.
* The timestamp must be bucketed using {@code GranularitySpec#getQueryGranularity}.
*/
ShardSpec getShardSpec(long timestamp, InputRow row);
}
Loading