Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ public static SortOrder buildSortOrder(Table table, SortOrder sortOrder) {
return buildSortOrder(table.schema(), table.spec(), sortOrder);
}

/**
* Attempts to match a user-supplied {@link SortOrder} with an equivalent sort order from a {@link
* Table}.
*
* @param table the table to try and match the sort order against
* @param userSuppliedSortOrder the user supplied sort order to try and match with a table sort
* order
* @return the matching {@link SortOrder} from the table (with the orderId set) or {@link
* SortOrder#unsorted()} if no match is found.
*/
public static SortOrder maybeFindTableSortOrder(Table table, SortOrder userSuppliedSortOrder) {
return table.sortOrders().values().stream()
.filter(sortOrder -> sortOrder.sameOrder(userSuppliedSortOrder))
.findFirst()
.orElseGet(SortOrder::unsorted);
}

/**
* Build a final sort order that satisfies the clustering required by the partition spec.
*
Expand Down
64 changes: 64 additions & 0 deletions core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,68 @@ public void testSortOrderClusteringWithRedundantPartitionFieldsMissing() {
.as("Should add spec fields as prefix")
.isEqualTo(expected);
}

@Test
public void testFindSortOrderForTable() {
PartitionSpec spec = PartitionSpec.unpartitioned();
SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2);

SortOrder tableSortOrder = table.sortOrder();

SortOrder actualOrder = SortOrderUtil.maybeFindTableSortOrder(table, tableSortOrder);

assertThat(actualOrder).as("Should find current table sort order").isEqualTo(table.sortOrder());
}

@Test
public void testFindSortOrderForTableWithoutFieldId() {
PartitionSpec spec = PartitionSpec.unpartitioned();
SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2);

SortOrder userSuppliedOrder =
SortOrder.builderFor(table.schema()).asc("id", NULLS_LAST).build();

SortOrder actualOrder = SortOrderUtil.maybeFindTableSortOrder(table, userSuppliedOrder);

assertThat(actualOrder).as("Should find current table sort order").isEqualTo(table.sortOrder());
}

@Test
public void testFindSortOrderForTableThatIsNotCurrentOrder() {
PartitionSpec spec = PartitionSpec.unpartitioned();
SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2);

table.replaceSortOrder().asc("data").desc("ts").commit();

SortOrder userSuppliedOrder =
SortOrder.builderFor(table.schema()).asc("id", NULLS_LAST).build();

SortOrder actualOrder = SortOrderUtil.maybeFindTableSortOrder(table, userSuppliedOrder);

assertThat(actualOrder)
.as("Should find first sorted table sort order")
.isEqualTo(table.sortOrders().get(1));
}

@Test
public void testReturnsEmptyForFindingNonMatchingSortOrder() {
PartitionSpec spec = PartitionSpec.unpartitioned();
SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2);

table.replaceSortOrder().asc("data").desc("ts").commit();

SortOrder userSuppliedOrder =
SortOrder.builderFor(table.schema()).desc("id", NULLS_LAST).build();

SortOrder actualOrder = SortOrderUtil.maybeFindTableSortOrder(table, userSuppliedOrder);

assertThat(actualOrder)
.as(
"Should return unsorted order if user supplied order does not match any table sort order")
.isEqualTo(SortOrder.unsorted());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableUtil;
Expand Down Expand Up @@ -162,6 +163,22 @@ public int outputSpecId() {
return outputSpecId;
}

public SortOrder outputSortOrder() {
int outputSortOrderId =
confParser
.intConf()
.option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID)
.defaultValue(SortOrder.unsorted().orderId())
.parse();

Preconditions.checkArgument(
table.sortOrders().containsKey(outputSortOrderId),
"Output sort order id %s is not a valid sort order id for table",
outputSortOrderId);

return table.sortOrders().get(outputSortOrderId);
}

public FileFormat dataFileFormat() {
String valueAsString =
confParser
Expand Down Expand Up @@ -280,6 +297,21 @@ public SparkWriteRequirements writeRequirements() {
table, distributionMode(), fanoutWriterEnabled(), dataAdvisoryPartitionSize());
}

public SparkWriteRequirements rewriteFilesWriteRequirements() {
Preconditions.checkNotNull(
rewrittenFileSetId(), "Can only use rewrite files write requirements during rewrite job!");

SortOrder outputSortOrder = outputSortOrder();
if (outputSortOrder.isSorted()) {
LOG.info(
"Found explicit sort order {} set in job configuration. Going to apply that to the sort-order-id of the rewritten files",
Spark3Util.describe(outputSortOrder));
return writeRequirements().withTableSortOrder(outputSortOrder);
}

return writeRequirements();
}

@VisibleForTesting
DistributionMode distributionMode() {
String modeName =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private SparkWriteOptions() {}
public static final String REWRITTEN_FILE_SCAN_TASK_SET_ID = "rewritten-file-scan-task-set-id";

public static final String OUTPUT_SPEC_ID = "output-spec-id";
public static final String OUTPUT_SORT_ORDER_ID = "output-sort-order-id";

public static final String OVERWRITE_MODE = "overwrite-mode";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,32 @@
/** A set of requirements such as distribution and ordering reported to Spark during writes. */
public class SparkWriteRequirements {

public static final long NO_ADVISORY_PARTITION_SIZE = 0;
public static final SparkWriteRequirements EMPTY =
new SparkWriteRequirements(Distributions.unspecified(), new SortOrder[0], 0);
new SparkWriteRequirements(
Distributions.unspecified(),
new SortOrder[0],
org.apache.iceberg.SortOrder.unsorted(),
NO_ADVISORY_PARTITION_SIZE);

private final Distribution distribution;
private final SortOrder[] ordering;
private final org.apache.iceberg.SortOrder icebergOrdering;
private final long advisoryPartitionSize;

SparkWriteRequirements(
Distribution distribution, SortOrder[] ordering, long advisoryPartitionSize) {
Distribution distribution,
SortOrder[] ordering,
org.apache.iceberg.SortOrder icebergOrdering,
long advisoryPartitionSize) {
this.distribution = distribution;
this.ordering = ordering;
this.advisoryPartitionSize = advisoryPartitionSize;
this.icebergOrdering = icebergOrdering;
// Spark prohibits requesting a particular advisory partition size without distribution
this.advisoryPartitionSize =
distribution instanceof UnspecifiedDistribution
? NO_ADVISORY_PARTITION_SIZE
: advisoryPartitionSize;
}

public Distribution distribution() {
Expand All @@ -48,12 +62,19 @@ public SortOrder[] ordering() {
return ordering;
}

public org.apache.iceberg.SortOrder icebergOrdering() {
return icebergOrdering;
}

public boolean hasOrdering() {
return ordering.length != 0;
}

public long advisoryPartitionSize() {
// Spark prohibits requesting a particular advisory partition size without distribution
return distribution instanceof UnspecifiedDistribution ? 0 : advisoryPartitionSize;
return advisoryPartitionSize;
}

public SparkWriteRequirements withTableSortOrder(org.apache.iceberg.SortOrder sortOrder) {
return new SparkWriteRequirements(distribution, ordering, sortOrder, advisoryPartitionSize);
}
}
Loading