Skip to content

Use PartitionsSpec for all task types#8141

Merged
jihoonson merged 9 commits intoapache:masterfrom
jihoonson:partitions-spec
Jul 31, 2019
Merged

Use PartitionsSpec for all task types#8141
jihoonson merged 9 commits intoapache:masterfrom
jihoonson:partitions-spec

Conversation

@jihoonson
Copy link
Copy Markdown
Contributor

@jihoonson jihoonson commented Jul 23, 2019

Part of #8061.

Description

PartitionsSpec is a class to describe the secondary partitioning method for data ingestion, but is being used by only Hadoop tasks. For more consistent behavior and configuration, all task types should use the same PartitionsSpec.

PartitionsSpec is the top interface and has one direct implementation, DynamicPartitionsSpec. DynamicPartitionsSpec is the new partitionsSpec and used by indexTask and kafka/kinesis IndexTasks.

DimensionBasedPartitionsSpec is the child interface of PartitionsSpec and represents the partitionsSpec based on dimension values. It has two implementations of HashedPartitionsSpec and SingleDimensionPartitionsSpec. These partitionsSpecs are used if and only if perfect rollup is configured.

This PR is backward-Incompatible for tasks which use indexTuningConfig (indexTask, compactionTask, and parallelIndexTask) because the JSON form of tuningConfig doesn't have maxRowsPerSegment, maxTotalRows, numShards, and partitionDimensions anymore. However, it still could read the old JSON format. It should be compatible for other task types.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.

Copy link
Copy Markdown
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a nice change 👍

@JsonIgnore
boolean isDeterminingPartitions();
/**
* Returns true if this partitionsSpec needs to determine the number of partitions to start data ingetsion.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: ingetsion

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

}

public IndexTuningConfig withMaxRowsPerSegment(int maxRowsPerSegment)
public IndexTuningConfig withPartitoinsSpec(PartitionsSpec partitionsSpec)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: withPartitoinsSpec

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.


if (addResult.isOk()) {
if (addResult.isPushRequired(tuningConfig)) {
final boolean isPushRequired =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to push this down into the AppenderatorDriverAddResult like it was previously? Is the other AppenderatorDriverAddResult.isPushRequired method still legitimately used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah it looks better to use the existing one. Reverted to use it.

// If the number of rows in the segment exceeds the threshold after adding a row,
// move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
if (addResult.isPushRequired(tuningConfig) && !sequenceToUse.isCheckpointed()) {
final boolean isPushRequired =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing re isPushRequired

Copy link
Copy Markdown
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm 👍

@dclim dclim self-assigned this Jul 25, 2019
this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions;
Preconditions.checkArgument(
PartitionsSpec.isEffectivelyNull(maxRowsPerSegment) || PartitionsSpec.isEffectivelyNull(numShards),
"Can't use maxRowsPerSegment and numShards together"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this is called through HadoopHashedPartitionsSpec, the field there is called targetPartitionSize instead of maxRowsPerSegment, so it might be clearer to indicate that name here as well in addition to maxRowsPerSegment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

PartitionsSpec.isEffectivelyNull(maxRowsPerSegment) || PartitionsSpec.isEffectivelyNull(numShards),
"Can't use maxRowsPerSegment and numShards together"
);
// Needs to determin partitions if the _given_ numShards is null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

determin -> determine

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

Preconditions.checkArgument(maxRowsPerSegment > 0, "maxRowsPerSegment must be specified");
this.maxRowsPerSegment = maxRowsPerSegment;
this.maxPartitionSize = PartitionsSpec.isEffectivelyNull(maxPartitionSize)
? Math.multiplyExact(maxRowsPerSegment, (int) (maxRowsPerSegment * 0.5))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this calculation right? maxPartitionSize previously defaulted to 50% more than targetPartitionSize - this is targetPartitionSize * targetPartitionSize * 0.5

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding! This should be Math.addExact(maxRowsPerSegment, (int) (maxRowsPerSegment * 0.5)). Thanks.

} else {
if (forceGuaranteedRollup) {
if (!(partitionsSpec instanceof HashedPartitionsSpec)) {
throw new ISE("HashedPartitonsSpec must be used for perfect rollup");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: HashedPartitionsSpec

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

@dclim
Copy link
Copy Markdown
Contributor

dclim commented Jul 30, 2019

👍 after conflict resolved

@jihoonson jihoonson merged commit 385f492 into apache:master Jul 31, 2019
@jihoonson
Copy link
Copy Markdown
Contributor Author

@dclim @clintropolis thank you for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants