Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexer.partitions;

import javax.annotation.Nullable;
import java.util.List;

/**
Expand All @@ -33,4 +34,7 @@ public interface DimensionBasedPartitionsSpec extends PartitionsSpec
String TARGET_PARTITION_SIZE = "targetPartitionSize";

List<String> getPartitionDimensions();

@Nullable
Integer getTargetRowsPerSegment();
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public HashedPartitionsSpec(

// Deprecated properties preserved for backward compatibility:
@Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable
Integer targetPartitionSize,
Integer targetPartitionSize, // prefer targetRowsPerSegment
@Deprecated @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable
Integer maxRowsPerSegment
Integer maxRowsPerSegment // prefer targetRowsPerSegment
)
{
Integer adjustedTargetRowsPerSegment = PartitionsSpec.resolveHistoricalNullIfNeeded(targetRowsPerSegment);
Expand Down Expand Up @@ -112,6 +112,13 @@ public HashedPartitionsSpec(
this(null, numShards, partitionDimensions, null, maxRowsPerSegment);
}

@Nullable
@Override
public Integer getTargetRowsPerSegment()
{
return null;
}

@Nullable
@Override
@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.NAME, value = SingleDimensionPartitionsSpec.class),
@JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class),
// for backward compatibility
@JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class), // for backward compatibility
@JsonSubTypes.Type(name = HashedPartitionsSpec.NAME, value = HashedPartitionsSpec.class),
@JsonSubTypes.Type(name = DynamicPartitionsSpec.NAME, value = DynamicPartitionsSpec.class)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private static int resolveMaxRowsPerSegment(Property<Integer> target, Property<I
}

@JsonProperty
@Override
@Nullable
public Integer getTargetRowsPerSegment()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ protected void innerReduce(Context context, SortableBytes keyBytes, Iterable<Dim
}

// See if we need to cut a new partition ending immediately before this dimension value
if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) {
if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows > config.getTargetPartitionSize()) {
Copy link
Copy Markdown
Member

@clintropolis clintropolis Oct 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the justification for making this change in behavior that has existed for over 6 years? I didn't see anything in the PR description to answer the 'why', just mention that it has been changed. Could you please update the PR description as well with whatever is the answer? It doesn't seem to make much difference and I have no opinions either way, just curious what motivated this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I was adding tests, I was expecting partitions sizes to match the expected target but they were sized to one less than the target because of this line. In practice, since the partition sizes are much larger than 1, the difference is negligible, so I'm ok with reverting this change too.

Related to your suggestion, I'll update the PR description to explain why the single-dim partitioning is being changed to use targetRowsPerSegment.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I don't think it needs reverted, I just wanted to know the motivation. It is probably best to look at how other similar row oriented indexing limits are handled in other indexing types to make sure the behavior is consistent everywhere if possible

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR description to mention the motivation for consistent behavior.

final ShardSpec shardSpec = new SingleDimensionShardSpec(
currentDimPartitions.dim,
currentDimPartitionStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ public class HadoopDruidIndexerConfig
{
private static final Injector INJECTOR;

public static final String CONFIG_PROPERTY = "druid.indexer.config";
public static final Charset JAVA_NATIVE_CHARSET = Charset.forName("Unicode");
public static final Splitter TAB_SPLITTER = Splitter.on("\t");
public static final Joiner TAB_JOINER = Joiner.on("\t");
static final String CONFIG_PROPERTY = "druid.indexer.config";
static final Charset JAVA_NATIVE_CHARSET = Charset.forName("Unicode");
static final Splitter TAB_SPLITTER = Splitter.on("\t");
static final Joiner TAB_JOINER = Joiner.on("\t");
public static final ObjectMapper JSON_MAPPER;
public static final IndexIO INDEX_IO;
public static final IndexMerger INDEX_MERGER_V9;
public static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG;
public static final DataSegmentPusher DATA_SEGMENT_PUSHER;
static final IndexMerger INDEX_MERGER_V9;
static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG;
static final DataSegmentPusher DATA_SEGMENT_PUSHER;
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";


Expand Down Expand Up @@ -133,7 +133,7 @@ public static HadoopDruidIndexerConfig fromSpec(HadoopIngestionSpec spec)
return new HadoopDruidIndexerConfig(spec);
}

public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
private static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{
// Eventually PathSpec needs to get rid of its Hadoop dependency, then maybe this can be ingested directly without
// the Map<> intermediary
Expand Down Expand Up @@ -226,14 +226,7 @@ public HadoopDruidIndexerConfig(
shardSpecLookups.put(
entry.getKey(), actualSpec.getLookup(
Lists.transform(
entry.getValue(), new Function<HadoopyShardSpec, ShardSpec>()
{
@Override
public ShardSpec apply(HadoopyShardSpec input)
{
return input.getActualSpec();
}
}
entry.getValue(), HadoopyShardSpec::getActualSpec
)
)
);
Expand Down Expand Up @@ -298,7 +291,7 @@ public IndexSpec getIndexSpecForIntermediatePersists()
return schema.getTuningConfig().getIndexSpecForIntermediatePersists();
}

public boolean isOverwriteFiles()
boolean isOverwriteFiles()
{
return schema.getTuningConfig().isOverwriteFiles();
}
Expand All @@ -313,24 +306,30 @@ public Optional<List<Interval>> getIntervals()
{
Optional<SortedSet<Interval>> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals();
if (setOptional.isPresent()) {
return Optional.of((List<Interval>) JodaUtils.condenseIntervals(setOptional.get()));
return Optional.of(JodaUtils.condenseIntervals(setOptional.get()));
} else {
return Optional.absent();
}
}

public boolean isDeterminingPartitions()
boolean isDeterminingPartitions()
{
return schema.getTuningConfig().getPartitionsSpec().needsDeterminePartitions(true);
}

public int getTargetPartitionSize()
{
final Integer targetPartitionSize = schema.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment();
DimensionBasedPartitionsSpec spec = schema.getTuningConfig().getPartitionsSpec();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only functional change in this file


if (spec.getTargetRowsPerSegment() != null) {
return spec.getTargetRowsPerSegment();
}

final Integer targetPartitionSize = spec.getMaxRowsPerSegment();
return targetPartitionSize == null ? -1 : targetPartitionSize;
}

public boolean isForceExtendableShardSpecs()
boolean isForceExtendableShardSpecs()
{
return schema.getTuningConfig().isForceExtendableShardSpecs();
}
Expand All @@ -355,7 +354,7 @@ public HadoopyShardSpec getShardSpec(Bucket bucket)
return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).get(bucket.partitionNum);
}

public int getShardSpecCount(Bucket bucket)
int getShardSpecCount(Bucket bucket)
{
return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).size();
}
Expand All @@ -370,18 +369,18 @@ public int getMaxParseExceptions()
return schema.getTuningConfig().getMaxParseExceptions();
}

public boolean isUseYarnRMJobStatusFallback()
boolean isUseYarnRMJobStatusFallback()
{
return schema.getTuningConfig().isUseYarnRMJobStatusFallback();
}


public void setHadoopJobIdFileName(String hadoopJobIdFileName)
void setHadoopJobIdFileName(String hadoopJobIdFileName)
{
this.hadoopJobIdFileName = hadoopJobIdFileName;
}

public String getHadoopJobIdFileName()
String getHadoopJobIdFileName()
{
return hadoopJobIdFileName;
}
Expand All @@ -390,9 +389,6 @@ public String getHadoopJobIdFileName()
* Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
* or via injected system properties) before this method is called. The {@link PathSpec} may
* create objects which depend on the values of these configurations.
* @param job
* @return
* @throws IOException
*/
public Job addInputPaths(Job job) throws IOException
{
Expand All @@ -410,7 +406,7 @@ public Job addInputPaths(Job job) throws IOException
*
* @return the Bucket that this row belongs to
*/
public Optional<Bucket> getBucket(InputRow inputRow)
Optional<Bucket> getBucket(InputRow inputRow)
{
final Optional<Interval> timeBucket = schema.getDataSchema().getGranularitySpec().bucketInterval(
DateTimes.utc(inputRow.getTimestampFromEpoch())
Expand All @@ -436,13 +432,13 @@ public Optional<Bucket> getBucket(InputRow inputRow)

}

public Optional<Set<Interval>> getSegmentGranularIntervals()
Optional<Set<Interval>> getSegmentGranularIntervals()
{
return Optional.fromNullable(
(Set<Interval>) schema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.orNull()
schema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.orNull()
);
}

Expand All @@ -453,40 +449,35 @@ public List<Interval> getInputIntervals()
.inputIntervals();
}

public Optional<Iterable<Bucket>> getAllBuckets()
Optional<Iterable<Bucket>> getAllBuckets()
{
Optional<Set<Interval>> intervals = getSegmentGranularIntervals();
if (intervals.isPresent()) {
return Optional.of(
(Iterable<Bucket>) FunctionalIterable
FunctionalIterable
.create(intervals.get())
.transformCat(
new Function<Interval, Iterable<Bucket>>()
{
@Override
public Iterable<Bucket> apply(Interval input)
{
final DateTime bucketTime = input.getStart();
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis());
if (specs == null) {
return ImmutableList.of();
}

return FunctionalIterable
.create(specs)
.transform(
new Function<HadoopyShardSpec, Bucket>()
{
int i = 0;
input -> {
final DateTime bucketTime = input.getStart();
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis());
if (specs == null) {
return ImmutableList.of();
}

return FunctionalIterable
.create(specs)
.transform(
new Function<HadoopyShardSpec, Bucket>()
{
int i = 0;

@Override
public Bucket apply(HadoopyShardSpec input)
{
return new Bucket(input.getShardNum(), bucketTime, i++);
}
@Override
public Bucket apply(HadoopyShardSpec input)
{
return new Bucket(input.getShardNum(), bucketTime, i++);
}
);
}
}
);
}
)
);
Expand All @@ -511,7 +502,7 @@ public String getWorkingPath()
* @return the intermediate path for this job run.
*/

public Path makeIntermediatePath()
Path makeIntermediatePath()
{
return new Path(
StringUtils.format(
Expand All @@ -524,7 +515,7 @@ public Path makeIntermediatePath()
);
}

public Path makeSegmentPartitionInfoPath(Interval bucketInterval)
Path makeSegmentPartitionInfoPath(Interval bucketInterval)
{
return new Path(
StringUtils.format(
Expand All @@ -536,7 +527,7 @@ public Path makeSegmentPartitionInfoPath(Interval bucketInterval)
);
}

public Path makeIntervalInfoPath()
Path makeIntervalInfoPath()
{
return new Path(
StringUtils.format(
Expand All @@ -546,27 +537,27 @@ public Path makeIntervalInfoPath()
);
}

public Path makeDescriptorInfoDir()
Path makeDescriptorInfoDir()
{
return new Path(makeIntermediatePath(), "segmentDescriptorInfo");
}

public Path makeGroupedDataDir()
Path makeGroupedDataDir()
{
return new Path(makeIntermediatePath(), "groupedData");
}

public Path makeDescriptorInfoPath(DataSegment segment)
Path makeDescriptorInfoPath(DataSegment segment)
{
return new Path(makeDescriptorInfoDir(), StringUtils.removeChar(segment.getId() + ".json", ':'));
}

public void addJobProperties(Job job)
void addJobProperties(Job job)
{
addJobProperties(job.getConfiguration());
}

public void addJobProperties(Configuration conf)
void addJobProperties(Configuration conf)
{
for (final Map.Entry<String, String> entry : schema.getTuningConfig().getJobProperties().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
Expand Down Expand Up @@ -597,7 +588,7 @@ public void verify()
Preconditions.checkNotNull(schema.getTuningConfig().getVersion(), "version");
}

public List<String> getAllowedHadoopPrefix()
List<String> getAllowedHadoopPrefix()
{
return allowedHadoopPrefix;
}
Expand Down
Loading