Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
edafd16
Fill in the core partition set size properly for batch ingestion with
jihoonson Jun 6, 2020
771eec1
Merge branch 'master' of github.com:apache/druid into dynamic-partiti…
jihoonson Jun 10, 2020
f1aeddb
incomplete javadoc
jihoonson Jun 10, 2020
42e37e2
Address comments
jihoonson Jun 10, 2020
d5e197e
fix tests
jihoonson Jun 11, 2020
99842f9
fix json serde, add tests
jihoonson Jun 11, 2020
b2222b9
checkstyle
jihoonson Jun 11, 2020
f8ac529
Set core partition set size for hash-partitioned segments properly in
jihoonson Jun 11, 2020
4600bd6
test for both parallel and single-threaded task
jihoonson Jun 11, 2020
7448773
unused variables
jihoonson Jun 11, 2020
08e2635
fix test
jihoonson Jun 12, 2020
30cae0e
unused imports
jihoonson Jun 12, 2020
5664a6c
add hash/range buckets
jihoonson Jun 13, 2020
f2c9bf3
some test adjustment and missing json serde
jihoonson Jun 13, 2020
af2b695
centralized partition id allocation in parallel and simple tasks
jihoonson Jun 13, 2020
209ad92
Merge branch 'master' of github.com:apache/druid into hash-partition-…
jihoonson Jun 13, 2020
ae87c94
remove string partition chunk
jihoonson Jun 13, 2020
dbe705a
revive string partition chunk
jihoonson Jun 13, 2020
71e2324
fill numCorePartitions for hadoop
jihoonson Jun 13, 2020
d6655af
clean up hash stuffs
jihoonson Jun 13, 2020
7d0d3b9
resolved todos
jihoonson Jun 13, 2020
6cff5b8
javadocs
jihoonson Jun 13, 2020
07a9e1b
Fix tests
jihoonson Jun 13, 2020
6e0ac6e
add more tests
jihoonson Jun 14, 2020
4a27771
doc
jihoonson Jun 14, 2020
7eaed9b
unused imports
jihoonson Jun 14, 2020
2db0a77
Allow append to existing datasources when dynamic partitioing is used
jihoonson Jun 14, 2020
374a466
fix test
jihoonson Jun 14, 2020
1106c92
checkstyle
jihoonson Jun 14, 2020
f109a5a
checkstyle
jihoonson Jun 14, 2020
19d6546
fix test
jihoonson Jun 14, 2020
4d56a88
fix test
jihoonson Jun 15, 2020
0475a63
fix other tests..
jihoonson Jun 15, 2020
4da2f8b
checkstyle
jihoonson Jun 15, 2020
f246bbb
hansle unknown core partitions size in overlord segment allocation
jihoonson Jun 15, 2020
d85a49a
fail to append when numCorePartitions is unknown
jihoonson Jun 17, 2020
5f48e44
log
jihoonson Jun 17, 2020
4af415f
fix comment; rename to be more intuitive
jihoonson Jun 17, 2020
21dea74
double append test
jihoonson Jun 18, 2020
8dc81b5
Merge branch 'master' of github.com:apache/druid into append-dynamic
jihoonson Jun 19, 2020
402c2dc
cleanup complete(); add tests
jihoonson Jun 23, 2020
92a6e90
fix build
jihoonson Jun 23, 2020
b423e96
add tests
jihoonson Jun 24, 2020
b4e702b
address comments
jihoonson Jun 25, 2020
9025e3b
checkstyle
jihoonson Jun 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/druid/segment/SegmentUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,20 @@
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Utility methods useful for implementing deep storage extensions.
Expand Down Expand Up @@ -93,6 +97,15 @@ public static Object commaSeparatedIdentifiers(@Nullable final Collection<DataSe
return Collections2.transform(segments, DataSegment::getId);
}

public static Map<Interval, List<DataSegment>> groupSegmentsByInterval(Collection<DataSegment> segments)
{
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
segments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
return intervalToSegments;
}

private SegmentUtils()
{
// no instantiation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public DataSegment(
List<String> dimensions,
List<String> metrics,
ShardSpec shardSpec,
CompactionState lastCompactionState,
@Nullable CompactionState lastCompactionState,
Integer binaryVersion,
long size
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,4 @@ default boolean possibleInDomain(Map<String, RangeSet<String>> domain)
{
throw new UnsupportedOperationException();
}

@Override
default boolean isCompatible(Class<? extends ShardSpec> other)
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,4 @@ default boolean possibleInDomain(Map<String, RangeSet<String>> domain)
{
throw new UnsupportedOperationException();
}

@Override
default boolean isCompatible(Class<? extends ShardSpec> other)
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,18 @@ public int getNumBuckets()
}

@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions)
{
// The shardSpec is created by the Overlord.
// For batch tasks, this code can be executed only with segment locking (forceTimeChunkLock = false).
// In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of
// the same datasource. Since there is no restriction for those tasks in segment allocation, the
// allocated IDs for each task can interleave. As a result, the core partition set cannot be
// represented as a range. We always set 0 for the core partition set size if this is an initial segment.
return new HashBasedNumberedShardSpec(
specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getPartitionNum() + 1,
specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getNumCorePartitions(),
partitionId,
numCorePartitions,
bucketId,
numBuckets,
partitionDimensions,
objectMapper
);
}

@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
{
return new HashBasedNumberedShardSpec(partitionId, 0, bucketId, numBuckets, partitionDimensions, objectMapper);
}

@Override
public Class<? extends ShardSpec> getShardSpecClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec

@JsonCreator
public HashBasedNumberedShardSpec(
@JsonProperty("partitionNum") int partitionNum, // partitionId, hash bucketId
@JsonProperty("partitionNum") int partitionNum, // partitionId
@JsonProperty("partitions") int partitions, // core partition set size
@JsonProperty("bucketId") @Nullable Integer bucketId, // nullable for backward compatibility
@JsonProperty("numBuckets") @Nullable Integer numBuckets, // nullable for backward compatibility
Expand Down Expand Up @@ -90,12 +90,6 @@ public List<String> getPartitionDimensions()
return partitionDimensions;
}

@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return other == HashBasedNumberedShardSpec.class;
}

@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import javax.annotation.Nullable;

public class LinearPartialShardSpec implements PartialShardSpec
{
private static final LinearPartialShardSpec INSTANCE = new LinearPartialShardSpec();
Expand All @@ -37,16 +35,9 @@ private LinearPartialShardSpec()
}

@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
{
return new LinearShardSpec(
specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getPartitionNum() + 1
);
}

@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions)
{
// numCorePartitions is ignored
return new LinearShardSpec(partitionId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
return true;
}

@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return other == LinearShardSpec.class;
}

@Override
public <T> PartitionChunk<T> createChunk(T obj)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
return true;
}

@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return other == NoneShardSpec.class;
}

@Override
public boolean equals(Object obj)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;

import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;

public class NumberedOverwritePartialShardSpec implements PartialShardSpec
{
Expand All @@ -43,6 +42,12 @@ public NumberedOverwritePartialShardSpec(
this.minorVersion = minorVersion;
}

@VisibleForTesting
public NumberedOverwritePartialShardSpec(int startRootPartitionId, int endRootPartitionId, int minorVersion)
{
this(startRootPartitionId, endRootPartitionId, (short) minorVersion);
}

@JsonProperty
public int getStartRootPartitionId()
{
Expand All @@ -62,29 +67,25 @@ public short getMinorVersion()
}

@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions)
{
// specOfPreviousMaxPartitionId is the max partitionId of the same shardSpec
// and could be null if all existing segments are first-generation segments.
return new NumberedOverwriteShardSpec(
specOfPreviousMaxPartitionId == null
? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
: specOfPreviousMaxPartitionId.getPartitionNum() + 1,
partitionId,
startRootPartitionId,
endRootPartitionId,
minorVersion
);
}

@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
public Class<? extends ShardSpec> getShardSpecClass()
{
return new NumberedOverwriteShardSpec(partitionId, startRootPartitionId, endRootPartitionId, minorVersion);
return NumberedOverwriteShardSpec.class;
}

@Override
public Class<? extends ShardSpec> getShardSpecClass()
public boolean useNonRootGenerationPartitionSpace()
{
return NumberedOverwriteShardSpec.class;
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,6 @@ public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
return true;
}

@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return other == NumberedOverwriteShardSpec.class || other == NumberedShardSpec.class;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import javax.annotation.Nullable;

public class NumberedPartialShardSpec implements PartialShardSpec
{
private static final NumberedPartialShardSpec INSTANCE = new NumberedPartialShardSpec();
Expand All @@ -37,27 +35,9 @@ private NumberedPartialShardSpec()
}

@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
{
if (specOfPreviousMaxPartitionId == null) {
// The shardSpec is created by the Overlord.
// - For streaming ingestion tasks, the core partition set is always 0.
// - For batch tasks, this code is executed only with segment locking (forceTimeChunkLock = false).
// In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of
// the same datasource. Since there is no restriction for those tasks in segment allocation, the
// allocated IDs for each task can interleave. As a result, the core partition set cannot be
// represented as a range. We always set 0 for the core partition set size.
return new NumberedShardSpec(0, 0);
} else {
final NumberedShardSpec prevSpec = (NumberedShardSpec) specOfPreviousMaxPartitionId;
return new NumberedShardSpec(prevSpec.getPartitionNum() + 1, prevSpec.getNumCorePartitions());
}
}

@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions)
{
return new NumberedShardSpec(partitionId, 0);
return new NumberedShardSpec(partitionId, numCorePartitions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
return true;
}

@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return other == NumberedShardSpec.class || other == NumberedOverwriteShardSpec.class;
}

@Override
@JsonProperty("partitions")
public int getNumCorePartitions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,16 @@ default OverwriteShardSpec withAtomicUpdateGroupSize(int atomicUpdateGroupSize)
}

OverwriteShardSpec withAtomicUpdateGroupSize(short atomicUpdateGroupSize);

/**
* Returns true if this shardSpec and the given {@link PartialShardSpec} share the same partition space.
* This shardSpec uses non-root-generation partition space and thus does not share the space with other shardSpecs.
*
* @see PartitionIds
*/
@Override
default boolean sharePartitionSpace(PartialShardSpec partialShardSpec)
{
return partialShardSpec.useNonRootGenerationPartitionSpace();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.ObjectMapper;

import javax.annotation.Nullable;

/**
* This interface is used in the segment allocation protocol when it is coordinated by the Overlord; when appending
* segments to an existing datasource (either streaming ingestion or batch append) or any case when segment
Expand All @@ -45,21 +43,31 @@
public interface PartialShardSpec
{
/**
* Creates a new ShardSpec based on {@code specOfPreviousMaxPartitionId}. If it's null, it assumes that this is the
* first call for the time chunk where the new segment is created.
* Note that {@code specOfPreviousMaxPartitionId} can also be null for {@link OverwriteShardSpec} if all segments
* in the timeChunk are first-generation segments.
*/
ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId);

/**
* Creates a new shardSpec having the given partitionId.
* Creates a new ShardSpec with given partitionId and numCorePartitions.
*
* @param objectMapper jsonMapper used only for {@link HashBasedNumberedShardSpec}
* @param partitionId partitionId of the shardSpec. must be carefully chosen to be unique in a time chunk
* @param numCorePartitions the core partition set size. Should be set properly to determine if this segment belongs
* to the core partitions.
*/
ShardSpec complete(ObjectMapper objectMapper, int partitionId);
ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions);

/**
* Returns the class of the shardSpec created by this factory.
*/
@JsonIgnore
Class<? extends ShardSpec> getShardSpecClass();

/**
* Returns true if this partialShardSpec needs a partitionId of a non-root generation.
* Any partialShardSpec to overwrite a subset of segments in a time chunk such as
* {@link NumberedOverwritePartialShardSpec} should return true.
*
*
* @see PartitionIds
*/
default boolean useNonRootGenerationPartitionSpace()
{
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,14 @@ default short getAtomicUpdateGroupSize()
boolean possibleInDomain(Map<String, RangeSet<String>> domain);

/**
* Returns true if two segments of this and other shardSpecs can exist in the same time chunk.
* Returns true if this shardSpec and the given {@link PartialShardSpec} share the same partition space.
* All shardSpecs except {@link OverwriteShardSpec} use the root-generation partition space and thus share the same
* space.
*
* @see PartitionIds
*/
@JsonIgnore
boolean isCompatible(Class<? extends ShardSpec> other);
default boolean sharePartitionSpace(PartialShardSpec partialShardSpec)
{
return !partialShardSpec.useNonRootGenerationPartitionSpace();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be partialShardSpec.useNonRootGenerationPartitionSpace(); instead of !partialShardSpec.useNonRootGenerationPartitionSpace(); ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the java doc for ShardSpec#sharePartitionSpace and OverwriteShardSpec#sharePartitionSpace should be updated

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, all ShardSpecs share the same root-generation partition space except OverwriteShardSpec. So, the current code is correct. I updated the javadoc to make it more clear.

}
}
Loading