Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,14 @@ acceptedBreaks:
\ of type erasure and the original type is always returned"
"1.3.0":
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.PartitionField"
new: "class org.apache.iceberg.PartitionField"
justification: "Added a new field"
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.SortField"
new: "class org.apache.iceberg.SortField"
justification: "Added a new field"
- code: "java.class.removed"
old: "class org.apache.iceberg.actions.ImmutableDeleteOrphanFiles"
justification: "Moving from iceberg-api to iceberg-core"
Expand Down
47 changes: 47 additions & 0 deletions api/src/main/java/org/apache/iceberg/Accessors.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
*/
package org.apache.iceberg;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;

/**
* Position2Accessor and Position3Accessor here is an optimization. For a nested schema like:
Expand Down Expand Up @@ -55,6 +59,49 @@ static Map<Integer, Accessor<StructLike>> forSchema(Schema schema) {
return TypeUtil.visit(schema, new BuildPositionAccessors());
}

static Accessor<StructLike> projectFields(Schema schema, int[] fieldIds) {
Preconditions.checkArgument(
fieldIds != null && fieldIds.length > 1, "selected fields must be non-empty");
List<Types.NestedField> fields =
Arrays.stream(fieldIds).mapToObj(schema::findField).collect(Collectors.toList());
Types.StructType projected = Types.StructType.of(fields);
// todo: handles all the projected fields are deleted, which should always produce an null
// struct.
StructProjection projection = StructProjection.createAllowMissing(schema.asStruct(), projected);
return new StructProjectionAccessor(projection, projected);
}

private static class StructProjectionAccessor implements Accessor<StructLike> {
private final StructProjection projection;
private final Types.StructType type;

StructProjectionAccessor(StructProjection projection, Types.StructType type) {
this.projection = projection;
this.type = type;
}

@Override
public Object get(StructLike row) {
return projection.wrap(row);
}

@Override
public Type type() {
return type;
}

public Class<?> javaClass() {
return type.typeId().javaClass();
}

@Override
public String toString() {
String[] fieldNames =
type.fields().stream().map(Types.NestedField::name).toArray(String[]::new);
return "Accessor(fieldNames=" + Arrays.toString(fieldNames) + ", type=" + type + ")";
}
}

private static class PositionAccessor implements Accessor<StructLike> {
private final int position;
private final Type type;
Expand Down
27 changes: 25 additions & 2 deletions api/src/main/java/org/apache/iceberg/PartitionField.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,32 @@
package org.apache.iceberg;

import java.io.Serializable;
import java.util.Arrays;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.transforms.Transform;

/** Represents a single field in a {@link PartitionSpec}. */
public class PartitionField implements Serializable {
private final int sourceId;
private final int[] sourceIds;
private final int fieldId;
private final String name;
private final Transform<?, ?> transform;

PartitionField(int sourceId, int fieldId, String name, Transform<?, ?> transform) {
this.sourceId = sourceId;
this.sourceIds = new int[] {sourceId};
this.fieldId = fieldId;
this.name = name;
this.transform = transform;
}

PartitionField(int[] sourceIds, int fieldId, String name, Transform<?, ?> transform) {
Preconditions.checkArgument(
sourceIds != null && sourceIds.length >= 1, "At least one source id should be provided");
this.sourceId = sourceIds.length > 1 ? -1 : sourceIds[0];
this.sourceIds = sourceIds;
this.fieldId = fieldId;
this.name = name;
this.transform = transform;
Expand All @@ -41,6 +55,10 @@ public int sourceId() {
return sourceId;
}

public int[] sourceIds() {
return sourceIds;
}

/** Returns the partition field id across all the table metadata's partition specs. */
public int fieldId() {
return fieldId;
Expand All @@ -58,7 +76,11 @@ public String name() {

@Override
public String toString() {
return fieldId + ": " + name + ": " + transform + "(" + sourceId + ")";
if (sourceIds.length == 1) {
return fieldId + ": " + name + ": " + transform + "(" + sourceId + ")";
} else {
return fieldId + ": " + name + ": " + transform + "(" + Arrays.toString(sourceIds) + ")";
}
}

@Override
Expand All @@ -71,13 +93,14 @@ public boolean equals(Object other) {

PartitionField that = (PartitionField) other;
return sourceId == that.sourceId
&& Arrays.equals(sourceIds, that.sourceIds)
&& fieldId == that.fieldId
&& name.equals(that.name)
&& transform.toString().equals(that.transform.toString());
}

@Override
public int hashCode() {
return Objects.hashCode(sourceId, fieldId, name, transform);
return Objects.hashCode(sourceId, sourceIds, fieldId, name, transform);
}
}
7 changes: 6 additions & 1 deletion api/src/main/java/org/apache/iceberg/PartitionKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ public PartitionKey(PartitionSpec spec, Schema inputSchema) {
Schema schema = spec.schema();
for (int i = 0; i < size; i += 1) {
PartitionField field = fields.get(i);
Accessor<StructLike> accessor = inputSchema.accessorForField(field.sourceId());
Accessor<StructLike> accessor;
if (field.sourceIds().length == 1) {
accessor = inputSchema.accessorForField(field.sourceId());
} else {
accessor = inputSchema.accessorForFields(field.sourceIds());
}
Preconditions.checkArgument(
accessor != null,
"Cannot build accessor for field: " + schema.findField(field.sourceId()));
Expand Down
91 changes: 65 additions & 26 deletions api/src/main/java/org/apache/iceberg/PartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
Expand Down Expand Up @@ -103,7 +104,11 @@ public UnboundPartitionSpec toUnbound() {

for (PartitionField field : fields) {
builder.addField(
field.transform().toString(), field.sourceId(), field.fieldId(), field.name());
field.transform().toString(),
field.sourceId(),
field.sourceIds(),
field.fieldId(),
field.name());
}

return builder.build();
Expand Down Expand Up @@ -354,24 +359,20 @@ private int nextFieldId() {
return lastAssignedFieldId.incrementAndGet();
}

private void checkAndAddPartitionName(String name) {
checkAndAddPartitionName(name, null);
}

Builder checkConflicts(boolean check) {
checkConflicts = check;
return this;
}

private void checkAndAddPartitionName(String name, Integer sourceColumnId) {
private void checkAndAddPartitionName(String name, int... sourceIds) {
Types.NestedField schemaField = schema.findField(name);
if (checkConflicts) {
if (sourceColumnId != null) {
if (sourceIds.length == 1) {
// for identity transform case we allow conflicts between partition and schema field name
// as
// long as they are sourced from the same schema field
Preconditions.checkArgument(
schemaField == null || schemaField.fieldId() == sourceColumnId,
schemaField == null || schemaField.fieldId() == sourceIds[0],
"Cannot create identity partition sourced from different field in schema: %s",
name);
} else {
Expand Down Expand Up @@ -504,6 +505,20 @@ public Builder hour(String sourceName) {
return hour(sourceName, sourceName + "_hour");
}

public Builder bucket(String[] sourceNames, int numBuckets, String targetName) {
Types.NestedField[] sourceColumns = new Types.NestedField[sourceNames.length];
int[] sourceColumnIds = new int[sourceNames.length];
for (int i = 0; i < sourceNames.length; i++) {
sourceColumns[i] = findSourceColumn(sourceNames[i]);
sourceColumnIds[i] = sourceColumns[i].fieldId();
}
Types.StructType type = Types.StructType.of(sourceColumns);
fields.add(
new PartitionField(
sourceColumnIds, nextFieldId(), targetName, Transforms.bucket(type, numBuckets)));
return this;
}

public Builder bucket(String sourceName, int numBuckets, String targetName) {
checkAndAddPartitionName(targetName);
Types.NestedField sourceColumn = findSourceColumn(sourceName);
Expand All @@ -520,6 +535,16 @@ public Builder bucket(String sourceName, int numBuckets) {
return bucket(sourceName, numBuckets, sourceName + "_bucket");
}

public Builder bucket(String[] sourceNames, int numBuckets) {
Preconditions.checkArgument(sourceNames != null && sourceNames.length >= 1);
if (sourceNames.length == 1) {
return bucket(sourceNames[0], numBuckets);
} else {
String targetName = Joiner.on("_").join(sourceNames);
return bucket(sourceNames, numBuckets, targetName + "_bucket");
}
}

public Builder truncate(String sourceName, int width, String targetName) {
checkAndAddPartitionName(targetName);
Types.NestedField sourceColumn = findSourceColumn(sourceName);
Expand Down Expand Up @@ -556,13 +581,24 @@ Builder add(int sourceId, String name, Transform<?, ?> transform) {
return add(sourceId, nextFieldId(), name, transform);
}

Builder add(int[] sourceIds, String name, Transform<?, ?> transform) {
return add(sourceIds, nextFieldId(), name, transform);
}

Builder add(int sourceId, int fieldId, String name, Transform<?, ?> transform) {
checkAndAddPartitionName(name, sourceId);
fields.add(new PartitionField(sourceId, fieldId, name, transform));
lastAssignedFieldId.getAndAccumulate(fieldId, Math::max);
return this;
}

Builder add(int[] sourceIds, int fieldId, String name, Transform<?, ?> transform) {
checkAndAddPartitionName(name, sourceIds);
fields.add(new PartitionField(sourceIds, fieldId, name, transform));
lastAssignedFieldId.getAndAccumulate(fieldId, Math::max);
return this;
}

public PartitionSpec build() {
PartitionSpec spec = buildUnchecked();
checkCompatibility(spec, schema);
Expand All @@ -576,25 +612,28 @@ PartitionSpec buildUnchecked() {

static void checkCompatibility(PartitionSpec spec, Schema schema) {
for (PartitionField field : spec.fields) {
Type sourceType = schema.findType(field.sourceId());
Transform<?, ?> transform = field.transform();
// In the case of a Version 1 partition-spec field gets deleted,
// it is replaced with a void transform, see:
// https://iceberg.apache.org/spec/#partition-transforms
// We don't care about the source type since a VoidTransform is always compatible and skip the
// checks
if (!transform.equals(Transforms.alwaysNull())) {
ValidationException.check(
sourceType != null, "Cannot find source column for partition field: %s", field);
ValidationException.check(
sourceType.isPrimitiveType(),
"Cannot partition by non-primitive source field: %s",
sourceType);
ValidationException.check(
transform.canTransform(sourceType),
"Invalid source type %s for transform: %s",
sourceType,
transform);
for (int id : field.sourceIds()) {
Type sourceType = schema.findType(id);
// In the case of a Version 1 partition-spec field gets deleted,
// it is replaced with a void transform, see:
// https://iceberg.apache.org/spec/#partition-transforms
// We don't care about the source type since a VoidTransform is always compatible and skip
// the
// checks
if (!transform.equals(Transforms.alwaysNull())) {
ValidationException.check(
sourceType != null, "Cannot find source column for partition field: %s", field);
ValidationException.check(
sourceType.isPrimitiveType(),
"Cannot partition by non-primitive source field: %s",
sourceType);
ValidationException.check(
transform.canTransform(sourceType),
"Invalid source type %s for transform: %s",
sourceType,
transform);
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,13 @@ public Accessor<StructLike> accessorForField(int id) {
return lazyIdToAccessor().get(id);
}

public Accessor<StructLike> accessorForFields(int[] ids) {
if (ids.length == 1) {
return accessorForField(ids[0]);
}
return Accessors.projectFields(this, ids);
}

/**
* Creates a projection schema for a subset of columns, selected by name.
*
Expand Down
Loading