From 14943e4df31c80dac03ea2ba4c6947c608d98e4c Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Thu, 3 Sep 2020 16:08:22 -0500 Subject: [PATCH 1/2] Add a Parallelized Scan Planning Action Planning for very large scans can take a considerable amount of time even for queries that will not return a large amount of data. Some of the reason of this is that all planning for Spark Reads happens locally prior to any data actually be fetched from Spark. We can speed this up by providing a distributing planning Action which will distribute the generation of the actual scan plan. This should allow us to scale planning with compute and IO resources. --- .../java/org/apache/iceberg/DataFile.java | 3 +- .../InclusiveMetricsEvaluator.java | 3 +- .../java/org/apache/iceberg/BaseFile.java | 9 + .../org/apache/iceberg/BaseTableScan.java | 27 +- .../org/apache/iceberg/DataTableScan.java | 5 + .../org/apache/iceberg/DeleteFileIndex.java | 26 +- .../iceberg/IncrementalDataTableScan.java | 47 +-- .../java/org/apache/iceberg/ScanTasks.java | 36 +++ .../org/apache/iceberg/TableScanContext.java | 24 +- .../org/apache/iceberg/util/SnapshotUtil.java | 71 +++++ .../apache/iceberg/util/TableScanUtil.java | 10 + .../org/apache/iceberg/actions/Actions.java | 4 + .../iceberg/actions/PlanScanAction.java | 288 ++++++++++++++++++ .../apache/iceberg/spark/SparkDataFile.java | 65 +++- .../apache/iceberg/spark/SparkStructLike.java | 3 +- .../spark/source/TestDataSourceOptions.java | 25 ++ .../source/TestIcebergSourceHadoopTables.java | 5 + .../source/TestIcebergSourceHiveTables.java | 5 + .../source/TestIcebergSourceTablesBase.java | 51 +++- .../spark/source/TestPartitionPruning.java | 24 +- .../spark/source/TestSnapshotSelection.java | 25 ++ .../spark/source/TestSparkReaderDeletes.java | 19 ++ .../apache/iceberg/spark/source/Reader.java | 42 ++- .../spark/source/TestDataSourceOptions24.java | 5 + .../TestIcebergSourceHadoopTables24.java | 5 + .../source/TestIcebergSourceHiveTables24.java | 5 + .../spark/source/TestPartitionPruning24.java | 6 +- .../spark/source/TestSnapshotSelection24.java | 5 + .../source/TestSparkReaderDeletes24.java | 5 + .../iceberg/spark/source/SparkBatchScan.java | 42 ++- .../spark/source/TestDataSourceOptions3.java | 5 + .../TestIcebergSourceHadoopTables3.java | 5 + .../source/TestIcebergSourceHiveTables3.java | 5 + .../spark/source/TestPartitionPruning3.java | 6 +- .../spark/source/TestSnapshotSelection3.java | 5 + .../spark/source/TestSparkReaderDeletes3.java | 5 + 36 files changed, 806 insertions(+), 115 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/ScanTasks.java create mode 100644 spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 1b8092639b9b..09956293d5fd 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -80,7 +80,8 @@ static StructType getType(StructType partitionType) { UPPER_BOUNDS, KEY_METADATA, SPLIT_OFFSETS, - EQUALITY_IDS + EQUALITY_IDS, + ManifestFile.SPEC_ID.asOptional() ); } diff --git a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java index 7bad98ccc9f4..16515ebd504a 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java +++ b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java @@ -19,6 +19,7 @@ package org.apache.iceberg.expressions; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Comparator; @@ -45,7 +46,7 @@ * rows and false if the file cannot contain matching rows. Files may be skipped if and only if the * return value of {@code eval} is false. */ -public class InclusiveMetricsEvaluator { +public class InclusiveMetricsEvaluator implements Serializable { private final Expression expr; public InclusiveMetricsEvaluator(Schema schema, Expression unbound) { diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 1d746717d3ab..b407c9f61b73 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -100,6 +100,10 @@ public PartitionData copy() { found = true; fromProjectionPos[i] = j; } + if (fields.get(i).fieldId() == ManifestFile.SPEC_ID.fieldId()) { + found = true; + fromProjectionPos[i] = 14; + } } if (!found) { @@ -255,6 +259,9 @@ public void put(int i, Object value) { case 13: this.equalityIds = ArrayUtil.toIntArray((List) value); return; + case 14: + this.partitionSpecId = (value != null) ? (Integer) value : -1; + return; default: // ignore the object, it must be from a newer version of the format } @@ -301,6 +308,8 @@ public Object get(int i) { return splitOffsets(); case 13: return equalityFieldIds(); + case 14: + return partitionSpecId; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index db0c18f217d6..86134a96bfa4 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -19,10 +19,6 @@ package org.apache.iceberg; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -47,7 +43,6 @@ abstract class BaseTableScan implements TableScan { private static final Logger LOG = LoggerFactory.getLogger(TableScan.class); - private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); private final TableOperations ops; private final Table table; @@ -65,6 +60,10 @@ protected BaseTableScan(TableOperations ops, Table table, Schema schema, TableSc this.context = context; } + protected TableScanContext tableScanContext() { + return context; + } + protected TableOperations tableOps() { return ops; } @@ -85,7 +84,7 @@ protected Collection selectedColumns() { return context.selectedColumns(); } - protected Map options() { + public Map options() { return context.options(); } @@ -105,6 +104,14 @@ protected abstract CloseableIterable planFiles( TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats); + public Long fromSnapshotId() { + return context().fromSnapshotId(); + } + + public Long toSnapshotId() { + return context.toSnapshotId(); + } + @Override public Table table() { return table; @@ -145,7 +152,7 @@ public TableScan asOfTime(long timestampMillis) { // the snapshot ID could be null if no entries were older than the requested time. in that case, // there is no valid snapshot to read. Preconditions.checkArgument(lastSnapshotId != null, - "Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis)); + "Cannot find a snapshot older than %s", TableScanUtil.formatTimestampMillis(timestampMillis)); return useSnapshot(lastSnapshotId); } @@ -202,7 +209,7 @@ public CloseableIterable planFiles() { Snapshot snapshot = snapshot(); if (snapshot != null) { LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table, - snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()), + snapshot.snapshotId(), TableScanUtil.formatTimestampMillis(snapshot.timestampMillis()), context.rowFilter()); Listeners.notifyAll( @@ -307,8 +314,4 @@ private Schema lazyColumnProjection() { return schema; } - - private static String formatTimestampMillis(long millis) { - return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.systemDefault())); - } } diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 9a6b85f652b8..1d7d8e43babf 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -94,4 +94,9 @@ protected long targetSplitSize(TableOperations ops) { return ops.current().propertyAsLong( TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT); } + + @Override + public TableScanContext tableScanContext() { + return super.tableScanContext(); + } } diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index 7605e1c3548b..c84742094932 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -62,7 +62,8 @@ * Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(long, DataFile)} or * {@link #forEntry(ManifestEntry)} to get the the delete files to apply to a given data file. */ -class DeleteFileIndex { +public class DeleteFileIndex { + private final static DeleteFile[] EMPTY_DELETES = new DeleteFile[0]; private final Map specsById; private final Map partitionTypeById; private final Map> wrapperById; @@ -100,7 +101,10 @@ DeleteFile[] forEntry(ManifestEntry entry) { return forDataFile(entry.sequenceNumber(), entry.file()); } - DeleteFile[] forDataFile(long sequenceNumber, DataFile file) { + public DeleteFile[] forDataFile(long sequenceNumber, DataFile file) { + if (isEmpty()) { + return EMPTY_DELETES; + } Pair partition = partition(file.specId(), file.partition()); Pair partitionDeletes = sortedDeletesByPartition.get(partition); @@ -305,11 +309,11 @@ private static Stream limitBySequenceNumber(long sequenceNumber, lon return Arrays.stream(files, start, files.length); } - static Builder builderFor(FileIO io, Iterable deleteManifests) { + public static Builder builderFor(FileIO io, Iterable deleteManifests) { return new Builder(io, Sets.newHashSet(deleteManifests)); } - static class Builder { + public static class Builder { private final FileIO io; private final Set deleteManifests; private Map specsById = null; @@ -318,37 +322,37 @@ static class Builder { private boolean caseSensitive = true; private ExecutorService executorService = null; - Builder(FileIO io, Set deleteManifests) { + public Builder(FileIO io, Set deleteManifests) { this.io = io; this.deleteManifests = Sets.newHashSet(deleteManifests); } - Builder specsById(Map newSpecsById) { + public Builder specsById(Map newSpecsById) { this.specsById = newSpecsById; return this; } - Builder filterData(Expression newDataFilter) { + public Builder filterData(Expression newDataFilter) { this.dataFilter = Expressions.and(dataFilter, newDataFilter); return this; } - Builder filterPartitions(Expression newPartitionFilter) { + public Builder filterPartitions(Expression newPartitionFilter) { this.partitionFilter = Expressions.and(partitionFilter, newPartitionFilter); return this; } - Builder caseSensitive(boolean newCaseSensitive) { + public Builder caseSensitive(boolean newCaseSensitive) { this.caseSensitive = newCaseSensitive; return this; } - Builder planWith(ExecutorService newExecutorService) { + public Builder planWith(ExecutorService newExecutorService) { this.executorService = newExecutorService; return this; } - DeleteFileIndex build() { + public DeleteFileIndex build() { // read all of the matching delete manifests in parallel and accumulate the matching files in a queue Queue> deleteEntries = new ConcurrentLinkedQueue<>(); Tasks.foreach(deleteManifestReaders()) diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java index 46756b0830a3..ad20f5c45917 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java @@ -25,7 +25,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; @@ -34,7 +33,7 @@ class IncrementalDataTableScan extends DataTableScan { IncrementalDataTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) { super(ops, table, schema, context.useSnapshotId(null)); - validateSnapshotIds(table, context.fromSnapshotId(), context.toSnapshotId()); + SnapshotUtil.validateSnapshotIds(table, context.fromSnapshotId(), context.toSnapshotId()); } @Override @@ -53,7 +52,7 @@ public TableScan useSnapshot(long scanSnapshotId) { @Override public TableScan appendsBetween(long newFromSnapshotId, long newToSnapshotId) { - validateSnapshotIdsRefinement(newFromSnapshotId, newToSnapshotId); + SnapshotUtil.validateSnapshotIdsRefinement(newFromSnapshotId, newToSnapshotId, table(), context()); return new IncrementalDataTableScan(tableOps(), table(), schema(), context().fromSnapshotId(newFromSnapshotId).toSnapshotId(newToSnapshotId)); } @@ -69,7 +68,7 @@ public TableScan appendsAfter(long newFromSnapshotId) { @Override public CloseableIterable planFiles() { // TODO publish an incremental appends scan event - List snapshots = snapshotsWithin(table(), + List snapshots = SnapshotUtil.snapshotsWithin(table(), context().fromSnapshotId(), context().toSnapshotId()); Set snapshotIds = Sets.newHashSet(Iterables.transform(snapshots, Snapshot::snapshotId)); Set manifests = FluentIterable @@ -106,45 +105,5 @@ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema sche return new IncrementalDataTableScan(ops, table, schema, context); } - private static List snapshotsWithin(Table table, long fromSnapshotId, long toSnapshotId) { - List snapshotIds = SnapshotUtil.snapshotIdsBetween(table, fromSnapshotId, toSnapshotId); - List snapshots = Lists.newArrayList(); - for (Long snapshotId : snapshotIds) { - Snapshot snapshot = table.snapshot(snapshotId); - // for now, incremental scan supports only appends - if (snapshot.operation().equals(DataOperations.APPEND)) { - snapshots.add(snapshot); - } else if (snapshot.operation().equals(DataOperations.OVERWRITE)) { - throw new UnsupportedOperationException( - String.format("Found %s operation, cannot support incremental data in snapshots (%s, %s]", - DataOperations.OVERWRITE, fromSnapshotId, toSnapshotId)); - } - } - return snapshots; - } - private void validateSnapshotIdsRefinement(long newFromSnapshotId, long newToSnapshotId) { - Set snapshotIdsRange = Sets.newHashSet( - SnapshotUtil.snapshotIdsBetween(table(), context().fromSnapshotId(), context().toSnapshotId())); - // since snapshotIdsBetween return ids in range (fromSnapshotId, toSnapshotId] - snapshotIdsRange.add(context().fromSnapshotId()); - Preconditions.checkArgument( - snapshotIdsRange.contains(newFromSnapshotId), - "from snapshot id %s not in existing snapshot ids range (%s, %s]", - newFromSnapshotId, context().fromSnapshotId(), newToSnapshotId); - Preconditions.checkArgument( - snapshotIdsRange.contains(newToSnapshotId), - "to snapshot id %s not in existing snapshot ids range (%s, %s]", - newToSnapshotId, context().fromSnapshotId(), context().toSnapshotId()); - } - - private static void validateSnapshotIds(Table table, long fromSnapshotId, long toSnapshotId) { - Preconditions.checkArgument(fromSnapshotId != toSnapshotId, "from and to snapshot ids cannot be the same"); - Preconditions.checkArgument( - table.snapshot(fromSnapshotId) != null, "from snapshot %s does not exist", fromSnapshotId); - Preconditions.checkArgument( - table.snapshot(toSnapshotId) != null, "to snapshot %s does not exist", toSnapshotId); - Preconditions.checkArgument(SnapshotUtil.ancestorOf(table, toSnapshotId, fromSnapshotId), - "from snapshot %s is not an ancestor of to snapshot %s", fromSnapshotId, toSnapshotId); - } } diff --git a/core/src/main/java/org/apache/iceberg/ScanTasks.java b/core/src/main/java/org/apache/iceberg/ScanTasks.java new file mode 100644 index 000000000000..42e434b3a942 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ScanTasks.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.expressions.ResidualEvaluator; + +public class ScanTasks { + + /** + * Utilty class no public constructor + */ + private ScanTasks() { + } + + public static BaseFileScanTask createBaseFileScanTask(DataFile file, DeleteFile[] deletes, String schemaString, + String specString, ResidualEvaluator residuals) { + return new BaseFileScanTask(file, deletes, schemaString, specString, residuals); + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java index dcf7d9753530..a2a49262df4e 100644 --- a/core/src/main/java/org/apache/iceberg/TableScanContext.java +++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java @@ -29,7 +29,7 @@ /** * Context object with optional arguments for a TableScan. */ -final class TableScanContext { +public final class TableScanContext { private final Long snapshotId; private final Expression rowFilter; private final boolean ignoreResiduals; @@ -41,7 +41,7 @@ final class TableScanContext { private final Long fromSnapshotId; private final Long toSnapshotId; - TableScanContext() { + public TableScanContext() { this.snapshotId = null; this.rowFilter = Expressions.alwaysTrue(); this.ignoreResiduals = false; @@ -70,7 +70,7 @@ private TableScanContext(Long snapshotId, Expression rowFilter, boolean ignoreRe this.toSnapshotId = toSnapshotId; } - Long snapshotId() { + public Long snapshotId() { return snapshotId; } @@ -79,7 +79,7 @@ TableScanContext useSnapshotId(Long scanSnapshotId) { caseSensitive, colStats, projectedSchema, selectedColumns, options, fromSnapshotId, toSnapshotId); } - Expression rowFilter() { + public Expression rowFilter() { return rowFilter; } @@ -88,7 +88,7 @@ TableScanContext filterRows(Expression filter) { caseSensitive, colStats, projectedSchema, selectedColumns, options, fromSnapshotId, toSnapshotId); } - boolean ignoreResiduals() { + public boolean ignoreResiduals() { return ignoreResiduals; } @@ -97,7 +97,7 @@ TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) { caseSensitive, colStats, projectedSchema, selectedColumns, options, fromSnapshotId, toSnapshotId); } - boolean caseSensitive() { + public boolean caseSensitive() { return caseSensitive; } @@ -106,7 +106,7 @@ TableScanContext setCaseSensitive(boolean isCaseSensitive) { isCaseSensitive, colStats, projectedSchema, selectedColumns, options, fromSnapshotId, toSnapshotId); } - boolean returnColumnStats() { + public boolean returnColumnStats() { return colStats; } @@ -115,7 +115,7 @@ TableScanContext shouldReturnColumnStats(boolean returnColumnStats) { caseSensitive, returnColumnStats, projectedSchema, selectedColumns, options, fromSnapshotId, toSnapshotId); } - Collection selectedColumns() { + public Collection selectedColumns() { return selectedColumns; } @@ -125,7 +125,7 @@ TableScanContext selectColumns(Collection columns) { caseSensitive, colStats, null, columns, options, fromSnapshotId, toSnapshotId); } - Schema projectedSchema() { + public Schema projectedSchema() { return projectedSchema; } @@ -135,7 +135,7 @@ TableScanContext project(Schema schema) { caseSensitive, colStats, schema, null, options, fromSnapshotId, toSnapshotId); } - Map options() { + public Map options() { return options; } @@ -147,7 +147,7 @@ TableScanContext withOption(String property, String value) { caseSensitive, colStats, projectedSchema, selectedColumns, builder.build(), fromSnapshotId, toSnapshotId); } - Long fromSnapshotId() { + public Long fromSnapshotId() { return fromSnapshotId; } @@ -156,7 +156,7 @@ TableScanContext fromSnapshotId(long id) { caseSensitive, colStats, projectedSchema, selectedColumns, options, id, toSnapshotId); } - Long toSnapshotId() { + public Long toSnapshotId() { return toSnapshotId; } diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 85f08e057340..a3836e5397a9 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -20,13 +20,18 @@ package org.apache.iceberg.util; import java.util.List; +import java.util.Set; import java.util.function.Function; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataOperations; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableScanContext; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; public class SnapshotUtil { private SnapshotUtil() { @@ -107,4 +112,70 @@ public static List newFiles(Long baseSnapshotId, long latestSnapshotId return newFiles; } + + public static List snapshotsWithin(Table table, long fromSnapshotId, long toSnapshotId) { + List snapshotIds = SnapshotUtil.snapshotIdsBetween(table, fromSnapshotId, toSnapshotId); + List snapshots = Lists.newArrayList(); + for (Long snapshotId : snapshotIds) { + Snapshot snapshot = table.snapshot(snapshotId); + // for now, incremental scan supports only appends + if (snapshot.operation().equals(DataOperations.APPEND)) { + snapshots.add(snapshot); + } else if (snapshot.operation().equals(DataOperations.OVERWRITE)) { + throw new UnsupportedOperationException( + String.format("Found %s operation, cannot support incremental data in snapshots (%s, %s]", + DataOperations.OVERWRITE, fromSnapshotId, toSnapshotId)); + } + } + return snapshots; + } + + /** + * Checks whether or not the bounds presented in the form of snapshotIds both reside within the table in question + * and that there exists a valid set of snapshots between them. Uses the passed in scan context to verify that this + * range also resides within any previously defined snapshot range in the scan. Throws exceptions if the arguments + * passed cannot be used to generate a legitimate snapshot range within the previously defined range. + * <p> + * Used for validating incremental scan parameters + * + * @param newFromSnapshotId beginning of snapshot range + * @param newToSnapshotId end of snapshot range + * @param table containing the snapshots we are building a range for + * @param context containing current scan restrictions + */ + public static void validateSnapshotIdsRefinement(long newFromSnapshotId, long newToSnapshotId, Table table, + TableScanContext context) { + Set snapshotIdsRange = Sets.newHashSet( + SnapshotUtil.snapshotIdsBetween(table, context.fromSnapshotId(), context.toSnapshotId())); + // since snapshotIdsBetween return ids in range (fromSnapshotId, toSnapshotId] + snapshotIdsRange.add(context.fromSnapshotId()); + Preconditions.checkArgument( + snapshotIdsRange.contains(newFromSnapshotId), + "from snapshot id %s not in existing snapshot ids range (%s, %s]", + newFromSnapshotId, context.fromSnapshotId(), newToSnapshotId); + Preconditions.checkArgument( + snapshotIdsRange.contains(newToSnapshotId), + "to snapshot id %s not in existing snapshot ids range (%s, %s]", + newToSnapshotId, context.fromSnapshotId(), context.toSnapshotId()); + } + + /** + * Validates whether two snapshots represent the beginning and end of a continuous range of snapshots in a given + * table. Throws exceptions if this is not the case. + * <p> + * Used for validating incremental scan parameters + * + * @param table containing snapshots + * @param fromSnapshotId beginning of snapshot range + * @param toSnapshotId end of snapshot range + */ + public static void validateSnapshotIds(Table table, long fromSnapshotId, long toSnapshotId) { + Preconditions.checkArgument(fromSnapshotId != toSnapshotId, "from and to snapshot ids cannot be the same"); + Preconditions.checkArgument( + table.snapshot(fromSnapshotId) != null, "from snapshot %s does not exist", fromSnapshotId); + Preconditions.checkArgument( + table.snapshot(toSnapshotId) != null, "to snapshot %s does not exist", toSnapshotId); + Preconditions.checkArgument(SnapshotUtil.ancestorOf(table, toSnapshotId, fromSnapshotId), + "from snapshot %s is not an ancestor of to snapshot %s", fromSnapshotId, toSnapshotId); + } } diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index a9c60df32cdd..c55d13397c8a 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -19,6 +19,10 @@ package org.apache.iceberg.util; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.function.Function; import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.CombinedScanTask; @@ -57,4 +61,10 @@ public static CloseableIterable planTasks(CloseableIterable> { + private static final Logger LOG = LoggerFactory.getLogger(PlanScanAction.class); + + + public enum PlanMode { + LOCAL, + DISTRIBUTED + } + + public static final String ICEBERG_PLAN_MODE = "iceberg.plan_mode"; + + public static final PlanMode parsePlanMode(String mode) { + try { + return PlanMode.valueOf(mode.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException(String.format("Cannot use planning mode %s, Available modes are: %s", mode, + Arrays.toString(PlanMode.values()))); + } + } + + private final Table table; + private final SparkSession spark; + private final JavaSparkContext sparkContext; + private final TableOperations ops; + private final Schema schema; + + private TableScanContext context; + + public PlanScanAction(SparkSession spark, Table table) { + this.table = table; + this.spark = spark; + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.schema = table.schema(); + this.ops = ((HasTableOperations) table).operations(); + this.context = new TableScanContext(); + } + + public PlanScanAction withContext(TableScanContext newContext) { + this.context = newContext; + return this; + } + + @Override + protected Table table() { + return table; + } + + @Override + public CloseableIterable execute() { + LOG.info("Preparing distributed planning of scan for {} snapshot {} created at {} with filter {}", + table, snapshot().snapshotId(), TableScanUtil.formatTimestampMillis(snapshot().timestampMillis()), + context.rowFilter()); + long start = System.currentTimeMillis(); + CloseableIterable result = planTasks(); + long elapsed = System.currentTimeMillis() - start; + LOG.info("Planning complete. Took {} ms", elapsed); + return result; + } + + protected CloseableIterable planTasks() { + Map options = context.options(); + long splitSize; + if (options.containsKey(TableProperties.SPLIT_SIZE)) { + splitSize = Long.parseLong(options.get(TableProperties.SPLIT_SIZE)); + } else { + splitSize = ops.current().propertyAsLong(TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT); + } + int lookback; + if (options.containsKey(TableProperties.SPLIT_LOOKBACK)) { + lookback = Integer.parseInt(options.get(TableProperties.SPLIT_LOOKBACK)); + } else { + lookback = ops.current().propertyAsInt( + TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT); + } + long openFileCost; + if (options.containsKey(TableProperties.SPLIT_OPEN_FILE_COST)) { + openFileCost = Long.parseLong(options.get(TableProperties.SPLIT_OPEN_FILE_COST)); + } else { + openFileCost = ops.current().propertyAsLong( + TableProperties.SPLIT_OPEN_FILE_COST, TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + } + + CloseableIterable fileScanTasks = planFiles(); + CloseableIterable splitFiles = TableScanUtil.splitFiles(fileScanTasks, splitSize); + return TableScanUtil.planTasks(splitFiles, splitSize, lookback, openFileCost); + } + + private Snapshot snapshot() { + return context.snapshotId() != null ? + ops.current().snapshot(context.snapshotId()) : + ops.current().currentSnapshot(); + } + + public CloseableIterable planFiles() { + // Create a dataframe of all DataFile entries + String dataFilesMetadataTable = metadataTableName(MetadataTableType.ENTRIES); + Dataset manifestEntries = + spark.read() + .format("iceberg") + .option("snapshot-id", snapshot().snapshotId()) + .load(dataFilesMetadataTable); + + // Todo pushdown filters to ManifestEntriesTable + // Read entries which are not deleted and are datafiles and not delete files + Dataset dataFileEntries = manifestEntries + .filter(manifestEntries.col("data_file").getField(DataFile.CONTENT.name()).equalTo(0)) // Only DataFiles + .filter(manifestEntries.col("status").notEqual(2)); // No Deleted Files + + dataFileEntries = handleIncrementalScan(dataFileEntries); + + // Build up evaluators and filters for Metrics and Partition values + Expression scanFilter = context.rowFilter(); + boolean isCaseSensitive = context.caseSensitive(); + + // Build cache of partition evaluators + Broadcast> broadcastPartitionEvaluators = buildPartitionEvaluators(); + + // Build metric evaluators + Broadcast broadcastMetricsEvaluator = sparkContext.broadcast( + new InclusiveMetricsEvaluator(schema, scanFilter, isCaseSensitive)); + + // Cache residual information and Partition spec information + Types.StructType partitionStruct = DataFile.getType(table().spec().partitionType()); + StructType dataFileSchema = (StructType) dataFileEntries.schema().apply("data_file").dataType(); + + // Evaluate all files based on their partition info and collect the rows back locally + Dataset scanTaskDataset = dataFileEntries + .select(dataFileEntries.col("data_file.*")) + .mapPartitions( + (MapPartitionsFunction) it -> { + SparkDataFile container = new SparkDataFile(partitionStruct, dataFileSchema); + return Streams.stream(it) + .filter(row -> { + SparkDataFile file = container.wrap(row); + return broadcastPartitionEvaluators.getValue().get(file.specId()).eval(file.partition()) && + broadcastMetricsEvaluator.getValue().eval(file); + }).iterator(); + }, RowEncoder.apply(dataFileEntries.schema())); + + LoadingCache specCache = buildSpecCache(); + + // Build delete index locally + DeleteFileIndex deleteFileIndex = buildDeleteFileIndex(); + + SparkDataFile container = new SparkDataFile(partitionStruct, dataFileSchema); + List tasks = scanTaskDataset.collectAsList().stream().map(row -> { + Row dataFile = row.getAs("data_file"); + SparkDataFile file = container.wrap(dataFile); + DeleteFile[] deletes = + deleteFileIndex.forDataFile(row.getAs("sequence_number"), file); + SpecCacheEntry cached = specCache.get(file.specId()); + return (FileScanTask) ScanTasks + .createBaseFileScanTask(file.copy(), deletes, cached.schemaString, cached.specString, cached.residuals); + }).collect(Collectors.toList()); + + return CloseableIterable.withNoopClose(tasks); + } + + private Dataset handleIncrementalScan(Dataset dataFileEntries) { + if (context.fromSnapshotId() != null) { + LOG.debug("Planning incremental scan from {} to {}", context.fromSnapshotId(), context.toSnapshotId()); + List snapshots = SnapshotUtil.snapshotsWithin(table, context.fromSnapshotId(), context.toSnapshotId()); + List validSnapshotIds = snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toList()); + return dataFileEntries + .filter(dataFileEntries.col("snapshot_id").isin(validSnapshotIds.toArray())) + .filter(dataFileEntries.col("status").equalTo(1)); // Added files only + } else { + return dataFileEntries; + } + } + + private LoadingCache buildSpecCache() { + return Caffeine.newBuilder().build((CacheLoader & Serializable) specId -> { + PartitionSpec spec = table().specs().get(specId); + Expression filter = context.ignoreResiduals() ? Expressions.alwaysTrue() : context.rowFilter(); + return new SpecCacheEntry(SchemaParser.toJson(spec.schema()), PartitionSpecParser.toJson(spec), + ResidualEvaluator.of(spec, filter, context.caseSensitive())); + }); + } + + private Broadcast> buildPartitionEvaluators() { + ImmutableMap.Builder evalMapBuilder = ImmutableMap.builder(); + boolean caseSensitive = context.caseSensitive(); + Expression filter = context.rowFilter(); + table.specs().entrySet().forEach(entry -> + evalMapBuilder.put(entry.getKey(), + new Evaluator(entry.getValue().partitionType(), + Projections.inclusive(entry.getValue(), caseSensitive).project(filter)))); + + Map partitionEvaluatorsById = evalMapBuilder.build(); + return sparkContext.broadcast(partitionEvaluatorsById); + } + + + private DeleteFileIndex buildDeleteFileIndex() { + // Build delete index locally + List deleteManifests = snapshot().deleteManifests(); + DeleteFileIndex.Builder deleteFileIndexBuilder = DeleteFileIndex.builderFor(table.io(), deleteManifests); + deleteFileIndexBuilder.caseSensitive(context.caseSensitive()); + deleteFileIndexBuilder.specsById(table.specs()); + deleteFileIndexBuilder.filterData(context.rowFilter()); + return deleteFileIndexBuilder.build(); + } + + private static class SpecCacheEntry implements Serializable { + private final String schemaString; + private final String specString; + private final ResidualEvaluator residuals; + + SpecCacheEntry(String schemaString, String specString, ResidualEvaluator residuals) { + this.schemaString = schemaString; + this.specString = specString; + this.residuals = residuals; + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 48dd00124273..a0f6c2c4c6e8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.List; import java.util.Locale; @@ -32,7 +33,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; -public class SparkDataFile implements DataFile { +public class SparkDataFile implements DataFile, Serializable { private final int filePathPosition; private final int fileFormatPosition; @@ -46,18 +47,38 @@ public class SparkDataFile implements DataFile { private final int upperBoundsPosition; private final int keyMetadataPosition; private final int splitOffsetsPosition; + private final int specIdPosition; private final Type lowerBoundsType; private final Type upperBoundsType; private final Type keyMetadataType; private final SparkStructLike wrappedPartition; + private final Types.StructType partitionStruct; private Row wrapped; + private static final StructLike EMPTY_PARTITION_INFO = new StructLike() { + @Override + public int size() { + return 0; + } + + @Override + public T get(int pos, Class javaClass) { + throw new UnsupportedOperationException("Cannot get a value from an empty partition"); + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot set a value in an empty partition"); + } + }; + public SparkDataFile(Types.StructType type, StructType sparkType) { this.lowerBoundsType = type.fieldType("lower_bounds"); this.upperBoundsType = type.fieldType("upper_bounds"); this.keyMetadataType = type.fieldType("key_metadata"); - this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType()); + this.partitionStruct = type.fieldType("partition").asStructType(); + this.wrappedPartition = new SparkStructLike(partitionStruct); Map positions = Maps.newHashMap(); type.fields().forEach(field -> { @@ -77,19 +98,47 @@ public SparkDataFile(Types.StructType type, StructType sparkType) { upperBoundsPosition = positions.get("upper_bounds"); keyMetadataPosition = positions.get("key_metadata"); splitOffsetsPosition = positions.get("split_offsets"); + specIdPosition = positions.get("partition_spec_id"); + } + + private SparkDataFile(SparkDataFile other) { + this.lowerBoundsType = other.lowerBoundsType; + this.upperBoundsType = other.upperBoundsType; + this.keyMetadataType = other.keyMetadataType; + this.wrappedPartition = new SparkStructLike(other.partitionStruct); + this.filePathPosition = other.filePathPosition; + this.fileFormatPosition = other.fileFormatPosition; + this.partitionPosition = other.partitionPosition; + this.recordCountPosition = other.recordCountPosition; + this.fileSizeInBytesPosition = other.fileSizeInBytesPosition; + this.columnSizesPosition = other.columnSizesPosition; + this.valueCountsPosition = other.valueCountsPosition; + this.nullValueCountsPosition = other.nullValueCountsPosition; + this.lowerBoundsPosition = other.lowerBoundsPosition; + this.upperBoundsPosition = other.upperBoundsPosition; + this.keyMetadataPosition = other.keyMetadataPosition; + this.splitOffsetsPosition = other.splitOffsetsPosition; + this.specIdPosition = other.specIdPosition; + this.partitionStruct = other.partitionStruct; + this.wrap(other.wrapped.copy()); } public SparkDataFile wrap(Row row) { this.wrapped = row; if (wrappedPartition.size() > 0) { - this.wrappedPartition.wrap(row.getAs(partitionPosition)); + Row partition = row.getAs(partitionPosition); + this.wrappedPartition.wrap(partition); } return this; } @Override public int specId() { - return -1; + if (wrappedPartition.size() > 0) { + return wrapped.getAs(specIdPosition); + } else { + return 0; + } } @Override @@ -105,7 +154,11 @@ public FileFormat format() { @Override public StructLike partition() { - return wrappedPartition; + if (wrappedPartition.size() > 0) { + return wrappedPartition; + } else { + return EMPTY_PARTITION_INFO; + } } @Override @@ -152,7 +205,7 @@ public ByteBuffer keyMetadata() { @Override public DataFile copy() { - throw new UnsupportedOperationException("Not implemented: copy"); + return new SparkDataFile(this); } @Override diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkStructLike.java b/spark/src/main/java/org/apache/iceberg/spark/SparkStructLike.java index 30509e3381dc..7ba7a0956a9d 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkStructLike.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkStructLike.java @@ -19,11 +19,12 @@ package org.apache.iceberg.spark; +import java.io.Serializable; import org.apache.iceberg.StructLike; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Row; -public class SparkStructLike implements StructLike { +public class SparkStructLike implements StructLike, Serializable { private final Types.StructType type; private Row wrapped; diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index cf1b8c9b59a7..99dab9b5ec21 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -31,6 +31,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -47,9 +48,12 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.optional; +@RunWith(Parameterized.class) public abstract class TestDataSourceOptions { private static final Configuration CONF = new Configuration(); @@ -59,6 +63,17 @@ public abstract class TestDataSourceOptions { ); private static SparkSession spark = null; + private final PlanScanAction.PlanMode planMode; + + @Parameterized.Parameters(name = "Plan Mode = {0}") + public static Object[] parameters() { + return new Object[] {PlanScanAction.PlanMode.LOCAL, PlanScanAction.PlanMode.DISTRIBUTED}; + } + + public TestDataSourceOptions(PlanScanAction.PlanMode planMode) { + this.planMode = planMode; + } + @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -163,6 +178,7 @@ public void testHadoopOptions() throws IOException { Dataset resultDf = spark.read() .format("iceberg") .option("hadoop.fs.default.name", "file:///") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); List resultRecords = resultDf.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -197,6 +213,7 @@ public void testSplitOptionsOverridesTableProperties() throws IOException { Dataset resultDf = spark.read() .format("iceberg") .option("split-size", String.valueOf(611)) // 611 bytes is the size of SimpleRecord(1,"a") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); Assert.assertEquals("Spark partitions should match", 2, resultDf.javaRDD().getNumPartitions()); @@ -236,6 +253,7 @@ public void testIncrementalScanOptions() throws IOException { .format("iceberg") .option("snapshot-id", snapshotIds.get(3).toString()) .option("start-snapshot-id", snapshotIds.get(3).toString()) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation).explain(); }); @@ -249,6 +267,7 @@ public void testIncrementalScanOptions() throws IOException { .format("iceberg") .option("as-of-timestamp", Long.toString(table.snapshot(snapshotIds.get(3)).timestampMillis())) .option("end-snapshot-id", snapshotIds.get(2).toString()) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation).explain(); }); @@ -261,6 +280,7 @@ public void testIncrementalScanOptions() throws IOException { spark.read() .format("iceberg") .option("end-snapshot-id", snapshotIds.get(2).toString()) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation).explain(); }); @@ -268,6 +288,7 @@ public void testIncrementalScanOptions() throws IOException { List result = spark.read() .format("iceberg") .option("start-snapshot-id", snapshotIds.get(3).toString()) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation) .orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -279,6 +300,7 @@ public void testIncrementalScanOptions() throws IOException { .format("iceberg") .option("start-snapshot-id", snapshotIds.get(2).toString()) .option("end-snapshot-id", snapshotIds.get(1).toString()) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation) .orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -322,6 +344,7 @@ public void testMetadataSplitSizeOptionOverrideTableProperties() throws IOExcept Dataset entriesDf = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation + "#entries"); Assert.assertEquals("Num partitions must match", 2, entriesDf.javaRDD().getNumPartitions()); @@ -329,6 +352,7 @@ public void testMetadataSplitSizeOptionOverrideTableProperties() throws IOExcept entriesDf = spark.read() .format("iceberg") .option("split-size", String.valueOf(128 * 1024 * 1024)) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation + "#entries"); Assert.assertEquals("Num partitions must match", 1, entriesDf.javaRDD().getNumPartitions()); } @@ -359,6 +383,7 @@ public void testDefaultMetadataSplitSize() throws IOException { Dataset metadataDf = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation + "#entries"); int partitionNum = metadataDf.javaRDD().getNumPartitions(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java index b869d7f8dc9d..b63adbdec4bc 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java @@ -24,6 +24,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopTables; import org.junit.Before; @@ -35,6 +36,10 @@ public abstract class TestIcebergSourceHadoopTables extends TestIcebergSourceTab File tableDir = null; String tableLocation = null; + public TestIcebergSourceHadoopTables(PlanScanAction.PlanMode planMode) { + super(planMode); + } + @Before public void setupTable() throws Exception { this.tableDir = temp.newFolder(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java index 70e920c8bd34..8493bb14bd13 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java @@ -25,6 +25,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.junit.After; @@ -34,6 +35,10 @@ public abstract class TestIcebergSourceHiveTables extends TestIcebergSourceTable private static TableIdentifier currentIdentifier; + public TestIcebergSourceHiveTables(PlanScanAction.PlanMode planMode) { + super(planMode); + } + @BeforeClass public static void start() { Namespace db = Namespace.of("db"); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 257042179c9b..2d74f3c219c0 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -32,6 +32,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.Actions; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.TableIdentifier; @@ -53,10 +54,13 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +@RunWith(Parameterized.class) public abstract class TestIcebergSourceTablesBase extends SparkTestBase { private static final Schema SCHEMA = new Schema( @@ -64,6 +68,17 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase { optional(2, "data", Types.StringType.get()) ); + private final PlanScanAction.PlanMode planMode; + + @Parameterized.Parameters(name = "Plan Mode = {0}") + public static Object[] parameters() { + return new Object[] {PlanScanAction.PlanMode.LOCAL, PlanScanAction.PlanMode.DISTRIBUTED}; + } + + public TestIcebergSourceTablesBase(PlanScanAction.PlanMode planMode) { + this.planMode = planMode; + } + @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -93,6 +108,7 @@ public synchronized void testTablesSupport() { Dataset resultDf = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier)); List actualRecords = resultDf.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -119,6 +135,7 @@ public void testEntriesTable() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "entries")) .collectAsList(); @@ -134,6 +151,7 @@ public void testEntriesTable() throws Exception { row.put(2, 0L); GenericData.Record file = (GenericData.Record) row.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(13, 0); // SpecId expected.add(row); }); } @@ -171,6 +189,7 @@ public void testAllEntriesTable() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_entries")) .orderBy("snapshot_id") .collectAsList(); @@ -184,6 +203,7 @@ public void testAllEntriesTable() throws Exception { row.put(2, 0L); GenericData.Record file = (GenericData.Record) row.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(13, 0); // SpecId expected.add(row); }); } @@ -215,11 +235,15 @@ public void testCountEntriesTable() { // count entries Assert.assertEquals("Count should return " + expectedEntryCount, - expectedEntryCount, spark.read().format("iceberg").load(loadLocation(tableIdentifier, "entries")).count()); + expectedEntryCount, spark.read().format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) + .load(loadLocation(tableIdentifier, "entries")).count()); // count all_entries Assert.assertEquals("Count should return " + expectedEntryCount, - expectedEntryCount, spark.read().format("iceberg").load(loadLocation(tableIdentifier, "all_entries")).count()); + expectedEntryCount, spark.read().format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) + .load(loadLocation(tableIdentifier, "all_entries")).count()); } @Test @@ -248,6 +272,7 @@ public void testFilesTable() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "files")) .collectAsList(); @@ -259,6 +284,7 @@ public void testFilesTable() throws Exception { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(14, 0); // SpecId expected.add(file); } } @@ -305,6 +331,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "files")) .collectAsList(); @@ -315,6 +342,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { for (GenericData.Record record : rows) { GenericData.Record file = (GenericData.Record) record.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(14, 0); // SpecId expected.add(file); } } @@ -364,6 +392,7 @@ public void testEntriesTableWithSnapshotIdInheritance() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "entries")) .select("sequence_number", "snapshot_id", "data_file") .collectAsList(); @@ -411,6 +440,7 @@ public void testFilesUnpartitionedTable() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "files")) .collectAsList(); @@ -422,6 +452,7 @@ public void testFilesUnpartitionedTable() throws Exception { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(13, 0); // SpecId expected.add(file); } } @@ -456,16 +487,19 @@ public void testAllMetadataTablesWithStagedCommits() throws Exception { List actualAllData = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_data_files")) .collectAsList(); List actualAllManifests = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_manifests")) .collectAsList(); List actualAllEntries = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_entries")) .collectAsList(); @@ -506,6 +540,7 @@ public void testAllDataFilesTable() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_data_files")) .orderBy("file_path") .collectAsList(); @@ -519,6 +554,7 @@ public void testAllDataFilesTable() throws Exception { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(14, 0); // SpecId expected.add(file); } } @@ -576,6 +612,7 @@ public void testHistoryTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "history")) .collectAsList(); @@ -639,6 +676,7 @@ public void testSnapshotsTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "snapshots")) .collectAsList(); @@ -691,6 +729,7 @@ public void testManifestsTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "manifests")) .collectAsList(); @@ -744,6 +783,7 @@ public void testAllManifestsTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_manifests")) .orderBy("path") .collectAsList(); @@ -810,6 +850,7 @@ public void testUnpartitionedPartitionsTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "partitions")) .collectAsList(); @@ -841,6 +882,7 @@ public void testPartitionsTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "partitions")) .orderBy("partition.id") .collectAsList(); @@ -871,6 +913,7 @@ public void testPartitionsTable() { List actualAfterFirstCommit = spark.read() .format("iceberg") .option("snapshot-id", String.valueOf(firstCommitId)) + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "partitions")) .orderBy("partition.id") .collectAsList(); @@ -913,7 +956,9 @@ public void testRemoveOrphanFilesActionSupport() throws InterruptedException { .execute(); Assert.assertEquals("Should delete 1 data file", 1, result2.size()); - Dataset resultDF = spark.read().format("iceberg").load(loadLocation(tableIdentifier)); + Dataset resultDF = spark.read().format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) + .load(loadLocation(tableIdentifier)); List actualRecords = resultDF .as(Encoders.bean(SimpleRecord.class)) .collectAsList(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java index 9159563db12f..f645fec0d647 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java @@ -42,6 +42,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -75,23 +76,27 @@ public abstract class TestPartitionPruning { private static final Configuration CONF = new Configuration(); private static final HadoopTables TABLES = new HadoopTables(CONF); - @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + @Parameterized.Parameters(name = "File format {0} - Vectorized Read {1} - Plan Mode {2}") public static Object[][] parameters() { return new Object[][] { - { "parquet", false }, - { "parquet", true }, - { "avro", false }, - { "orc", false }, - { "orc", true } + new Object[] { "parquet", false, PlanScanAction.PlanMode.LOCAL }, + new Object[] { "parquet", false, PlanScanAction.PlanMode.DISTRIBUTED }, + new Object[] { "parquet", true, PlanScanAction.PlanMode.LOCAL }, + new Object[] { "avro", false, PlanScanAction.PlanMode.DISTRIBUTED }, + new Object[] { "orc", false, PlanScanAction.PlanMode.LOCAL }, + new Object[] { "orc", false, PlanScanAction.PlanMode.DISTRIBUTED }, + new Object[] { "orc", true, PlanScanAction.PlanMode.LOCAL}, }; } private final String format; private final boolean vectorized; + private final PlanScanAction.PlanMode planMode; - public TestPartitionPruning(String format, boolean vectorized) { + public TestPartitionPruning(String format, boolean vectorized, PlanScanAction.PlanMode planMode) { this.format = format; this.vectorized = vectorized; + this.planMode = planMode; } private static SparkSession spark = null; @@ -255,6 +260,7 @@ private void runTest(String filterCond, Predicate partCondition) { List actual = spark.read() .format("iceberg") .option("vectorization-enabled", String.valueOf(vectorized)) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(table.location()) .select("id", "date", "level", "message", "timestamp") .filter(filterCond) @@ -315,7 +321,9 @@ private void assertAccessOnDataFiles(File originTableLocation, Table table, Pred .stream().filter(path -> path.startsWith(originTableLocation.getAbsolutePath())) .collect(Collectors.toSet()); - List files = spark.read().format("iceberg").load(table.location() + "#files").collectAsList(); + List files = spark.read().format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) + .load(table.location() + "#files").collectAsList(); Set filesToRead = extractFilePathsMatchingConditionOnPartition(files, partCondition); Set filesToNotRead = extractFilePathsNotIn(files, filesToRead); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 9ea7d261911e..4ea9fcef02ed 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -26,6 +26,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -40,22 +41,39 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.optional; +@RunWith(Parameterized.class) public abstract class TestSnapshotSelection { + @Parameterized.Parameters(name = "Plan Mode {0}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {PlanScanAction.PlanMode.LOCAL}, + new Object[] {PlanScanAction.PlanMode.DISTRIBUTED}, + }; + } + private static final Configuration CONF = new Configuration(); private static final Schema SCHEMA = new Schema( optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get()) ); + private final PlanScanAction.PlanMode planMode; + @Rule public TemporaryFolder temp = new TemporaryFolder(); private static SparkSession spark = null; + public TestSnapshotSelection(PlanScanAction.PlanMode planMode) { + this.planMode = planMode; + } + @BeforeClass public static void startSpark() { TestSnapshotSelection.spark = SparkSession.builder().master("local[2]").getOrCreate(); @@ -99,6 +117,7 @@ public void testSnapshotSelectionById() throws IOException { // verify records in the current snapshot Dataset currentSnapshotResult = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); List currentSnapshotRecords = currentSnapshotResult.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -114,6 +133,7 @@ public void testSnapshotSelectionById() throws IOException { Dataset previousSnapshotResult = spark.read() .format("iceberg") .option("snapshot-id", parentSnapshotId) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); List previousSnapshotRecords = previousSnapshotResult.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -155,6 +175,7 @@ public void testSnapshotSelectionByTimestamp() throws IOException { // verify records in the current snapshot Dataset currentSnapshotResult = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); List currentSnapshotRecords = currentSnapshotResult.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -168,6 +189,7 @@ public void testSnapshotSelectionByTimestamp() throws IOException { Dataset previousSnapshotResult = spark.read() .format("iceberg") .option("as-of-timestamp", firstSnapshotTimestamp) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); List previousSnapshotRecords = previousSnapshotResult.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -186,6 +208,7 @@ public void testSnapshotSelectionByInvalidSnapshotId() throws IOException { Dataset df = spark.read() .format("iceberg") .option("snapshot-id", -10) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); df.collectAsList(); @@ -203,6 +226,7 @@ public void testSnapshotSelectionByInvalidTimestamp() throws IOException { Dataset df = spark.read() .format("iceberg") .option("as-of-timestamp", timestamp) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); df.collectAsList(); @@ -230,6 +254,7 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException { .format("iceberg") .option("snapshot-id", snapshotId) .option("as-of-timestamp", timestamp) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); df.collectAsList(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 229a361f6924..4b4184f0048d 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -26,6 +26,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.DeleteReadTests; @@ -41,15 +42,32 @@ import org.apache.spark.sql.internal.SQLConf; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +@RunWith(Parameterized.class) public abstract class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; + @Parameterized.Parameters(name = "Plan Mode {0}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {PlanScanAction.PlanMode.LOCAL}, + new Object[] {PlanScanAction.PlanMode.DISTRIBUTED}, + }; + } + + private final PlanScanAction.PlanMode planMode; + + public TestSparkReaderDeletes(PlanScanAction.PlanMode planMode) { + this.planMode = planMode; + } + @BeforeClass public static void startMetastoreAndSpark() { metastore = new TestHiveMetastore(); @@ -101,6 +119,7 @@ protected void dropTable(String name) { public StructLikeSet rowSet(String name, Table table, String... columns) { Dataset df = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(TableIdentifier.of("default", name).toString()) .selectExpr(columns); diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 826c4508381b..ff55f6823ee9 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataTableScan; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; @@ -36,6 +37,9 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; +import org.apache.iceberg.actions.Actions; +import org.apache.iceberg.actions.PlanScanAction; +import org.apache.iceberg.actions.PlanScanAction.PlanMode; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; @@ -96,6 +100,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus private final boolean localityPreferred; private final boolean batchReadsEnabled; private final int batchSize; + private final PlanMode planMode; // lazy variables private Schema schema = null; @@ -108,6 +113,8 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus this.table = table; this.snapshotId = options.get("snapshot-id").map(Long::parseLong).orElse(null); this.asOfTimestamp = options.get("as-of-timestamp").map(Long::parseLong).orElse(null); + this.planMode = options.get(PlanScanAction.ICEBERG_PLAN_MODE).map(PlanScanAction::parsePlanMode) + .orElse(PlanMode.LOCAL); if (snapshotId != null && asOfTimestamp != null) { throw new IllegalArgumentException( "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); @@ -385,16 +392,39 @@ private List tasks() { } } - try (CloseableIterable tasksIterable = scan.planTasks()) { - this.tasks = Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); - } + this.tasks = planScan(scan); } - return tasks; } + private List planScan(TableScan scan) { + // TODO Add Automatic mode for determining when to do Distributed Planning + if (planMode == PlanMode.DISTRIBUTED && scan instanceof DataTableScan) { + return planDistributedScan((DataTableScan) scan); + } else { + return planLocalScan(scan); + } + } + + private List planDistributedScan(DataTableScan scan) { + List result; + try { + result = Lists.newArrayList(Actions.forTable(table).planScan().withContext(scan.tableScanContext()).execute()); + } catch (Exception e) { + LOG.error("Cannot run distributed job planning, falling back to local planning.", e); + result = planLocalScan(scan); + } + return result; + } + + private List planLocalScan(TableScan scan) { + try (CloseableIterable tasksIterable = scan.planTasks()) { + return Lists.newArrayList(tasksIterable); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + } + } + @Override public String toString() { return String.format( diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions24.java index 26af76e52614..d73187d51970 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions24.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestDataSourceOptions24 extends TestDataSourceOptions { + public TestDataSourceOptions24(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables24.java index 1252f3427973..4275dfc61074 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables24.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestIcebergSourceHadoopTables24 extends TestIcebergSourceHadoopTables { + public TestIcebergSourceHadoopTables24(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables24.java index 7ba46cb90aca..a338208e8018 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables24.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestIcebergSourceHiveTables24 extends TestIcebergSourceHiveTables { + public TestIcebergSourceHiveTables24(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning24.java index 4b3889e8af21..e5232f5640df 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning24.java @@ -19,8 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestPartitionPruning24 extends TestPartitionPruning { - public TestPartitionPruning24(String format, boolean vectorized) { - super(format, vectorized); + public TestPartitionPruning24(String format, boolean vectorized, PlanScanAction.PlanMode distributedPlanning) { + super(format, vectorized, distributedPlanning); } } diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection24.java index c003845fba49..0f9488935ab3 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection24.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestSnapshotSelection24 extends TestSnapshotSelection { + public TestSnapshotSelection24(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java index 5b13bffca166..3d22ff3fcd68 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestSparkReaderDeletes24 extends TestSparkReaderDeletes { + public TestSparkReaderDeletes24(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 13b85ad2135a..85094b697c89 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.stream.Collectors; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataTableScan; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; @@ -34,6 +35,9 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; +import org.apache.iceberg.actions.Actions; +import org.apache.iceberg.actions.PlanScanAction; +import org.apache.iceberg.actions.PlanScanAction.PlanMode; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; @@ -82,6 +86,7 @@ class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private final Broadcast encryptionManager; private final boolean batchReadsEnabled; private final int batchSize; + private final PlanMode planMode; // lazy variables private StructType readSchema = null; @@ -97,6 +102,8 @@ class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { this.filterExpressions = filters; this.snapshotId = Spark3Util.propertyAsLong(options, "snapshot-id", null); this.asOfTimestamp = Spark3Util.propertyAsLong(options, "as-of-timestamp", null); + this.planMode = PlanScanAction.parsePlanMode( + options.getOrDefault(PlanScanAction.ICEBERG_PLAN_MODE, PlanMode.LOCAL.name())); if (snapshotId != null && asOfTimestamp != null) { throw new IllegalArgumentException( @@ -280,15 +287,38 @@ private List tasks() { scan = scan.filter(filter); } } + this.tasks = planScan(scan); + } + return tasks; + } - try (CloseableIterable tasksIterable = scan.planTasks()) { - this.tasks = Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); - } + private List planScan(TableScan scan) { + // TODO Need to only use distributed planner for supported implementations and add some heuristics about when + // to use + if (planMode == PlanMode.DISTRIBUTED && scan instanceof DataTableScan) { + return planDistributedScan((DataTableScan) scan); + } else { + return planLocalScan(scan); + } + } + + private List planDistributedScan(DataTableScan scan) { + List result; + try { + result = Lists.newArrayList(Actions.forTable(table).planScan().withContext(scan.tableScanContext()).execute()); + } catch (Exception e) { + LOG.error("Cannot run distributed job planning, falling back to local planning.", e); + result = planLocalScan(scan); } + return result; + } - return tasks; + private List planLocalScan(TableScan scan) { + try (CloseableIterable tasksIterable = scan.planTasks()) { + return Lists.newArrayList(tasksIterable); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + } } @Override diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions3.java index 8dbf08c3caf6..5f8071944f83 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions3.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestDataSourceOptions3 extends TestDataSourceOptions { + public TestDataSourceOptions3(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables3.java index 166a2bef0979..5a3d229d6545 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables3.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestIcebergSourceHadoopTables3 extends TestIcebergSourceHadoopTables { + public TestIcebergSourceHadoopTables3(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables3.java index c69f3bb8dd56..68062f0c86e3 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables3.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestIcebergSourceHiveTables3 extends TestIcebergSourceHiveTables { + public TestIcebergSourceHiveTables3(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning3.java index 2d09c8287ea1..56acf7125e53 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning3.java @@ -19,8 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestPartitionPruning3 extends TestPartitionPruning { - public TestPartitionPruning3(String format, boolean vectorized) { - super(format, vectorized); + public TestPartitionPruning3(String format, boolean vectorized, PlanScanAction.PlanMode distributedPlanning) { + super(format, vectorized, distributedPlanning); } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection3.java index 4a08f844ce48..e40ca5c6f585 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection3.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestSnapshotSelection3 extends TestSnapshotSelection { + public TestSnapshotSelection3(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java index 82835d695106..ca10bb467d36 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestSparkReaderDeletes3 extends TestSparkReaderDeletes { + public TestSparkReaderDeletes3(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } From b37a880ad27a5c4b1441ed82421792d08ed57351 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Mon, 26 Oct 2020 12:10:45 -0500 Subject: [PATCH 2/2] More Reviewer Comments --- .../org/apache/iceberg/DeleteFileIndex.java | 2 +- .../apache/iceberg/ManifestEntriesTable.java | 4 ++-- .../iceberg/actions/PlanScanAction.java | 20 +++++++++++-------- .../apache/iceberg/spark/SparkTestBase.java | 2 ++ .../spark/source/TestDataSourceOptions.java | 5 ++++- .../spark/source/TestPartitionPruning.java | 1 + .../spark/source/TestSnapshotSelection.java | 5 ++++- .../spark/source/TestSparkReaderDeletes.java | 1 + .../apache/iceberg/spark/source/Reader.java | 3 +++ .../iceberg/spark/source/SparkBatchScan.java | 4 ++++ 10 files changed, 34 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index c84742094932..a84f2fed98d7 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -63,7 +63,7 @@ * {@link #forEntry(ManifestEntry)} to get the the delete files to apply to a given data file. */ public class DeleteFileIndex { - private final static DeleteFile[] EMPTY_DELETES = new DeleteFile[0]; + private static final DeleteFile[] EMPTY_DELETES = new DeleteFile[0]; private final Map specsById; private final Map partitionTypeById; private final Map> wrapperById; diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index 262cd92223fa..855494e442fd 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -102,8 +102,8 @@ protected long targetSplitSize(TableOperations ops) { protected CloseableIterable planFiles( TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { - // return entries from both data and delete manifests - CloseableIterable manifests = CloseableIterable.withNoopClose(snapshot.allManifests()); + // Only return Data Manifests, TODO Handle Delete Manifests + CloseableIterable manifests = CloseableIterable.withNoopClose(snapshot.dataManifests()); Type fileProjection = schema().findType("data_file"); Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema(); String schemaString = SchemaParser.toJson(schema()); diff --git a/spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java b/spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java index 309faefb9308..eb1bce446dfd 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java @@ -73,15 +73,15 @@ public class PlanScanAction extends BaseAction> { private static final Logger LOG = LoggerFactory.getLogger(PlanScanAction.class); - public enum PlanMode { LOCAL, DISTRIBUTED } - public static final String ICEBERG_PLAN_MODE = "iceberg.plan_mode"; + public static final String ICEBERG_PLAN_MODE = "plan_mode"; + public static final String ICEBERG_TEST_PLAN_MODE = "test_plan_mode"; - public static final PlanMode parsePlanMode(String mode) { + public static PlanMode parsePlanMode(String mode) { try { return PlanMode.valueOf(mode.toUpperCase(Locale.ROOT)); } catch (IllegalArgumentException ex) { @@ -97,6 +97,7 @@ public static final PlanMode parsePlanMode(String mode) { private final Schema schema; private TableScanContext context; + private Snapshot lazySnapshot; public PlanScanAction(SparkSession spark, Table table) { this.table = table; @@ -158,9 +159,12 @@ protected CloseableIterable planTasks() { } private Snapshot snapshot() { - return context.snapshotId() != null ? - ops.current().snapshot(context.snapshotId()) : - ops.current().currentSnapshot(); + if (lazySnapshot == null) { + lazySnapshot = context.snapshotId() != null ? + ops.current().snapshot(context.snapshotId()) : + ops.current().currentSnapshot(); + } + return lazySnapshot; } public CloseableIterable planFiles() { @@ -197,13 +201,13 @@ public CloseableIterable planFiles() { // Evaluate all files based on their partition info and collect the rows back locally Dataset scanTaskDataset = dataFileEntries - .select(dataFileEntries.col("data_file.*")) .mapPartitions( (MapPartitionsFunction) it -> { SparkDataFile container = new SparkDataFile(partitionStruct, dataFileSchema); return Streams.stream(it) .filter(row -> { - SparkDataFile file = container.wrap(row); + Row dataFile = row.getAs("data_file"); + SparkDataFile file = container.wrap(dataFile); return broadcastPartitionEvaluators.getValue().get(file.specId()).eval(file.partition()) && broadcastMetricsEvaluator.getValue().eval(file); }).iterator(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java index 9ac098cae376..da2e678bc7b5 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java @@ -23,6 +23,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.hive.HiveCatalog; @@ -57,6 +58,7 @@ public static void startMetastoreAndSpark() { .master("local[2]") .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) + .config(PlanScanAction.ICEBERG_TEST_PLAN_MODE, "true") .enableHiveSupport() .getOrCreate(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 99dab9b5ec21..289b581e3ba5 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -79,7 +79,10 @@ public TestDataSourceOptions(PlanScanAction.PlanMode planMode) { @BeforeClass public static void startSpark() { - TestDataSourceOptions.spark = SparkSession.builder().master("local[2]").getOrCreate(); + TestDataSourceOptions.spark = SparkSession + .builder() + .config(PlanScanAction.ICEBERG_TEST_PLAN_MODE, "true") + .master("local[2]").getOrCreate(); } @AfterClass diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java index f645fec0d647..ac0d82fc0ced 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java @@ -114,6 +114,7 @@ public static void startSpark() { String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme); CONF.set(optionKey, CountOpenLocalFileSystem.class.getName()); spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName()); + spark.conf().set(PlanScanAction.ICEBERG_TEST_PLAN_MODE, "true"); spark.conf().set("spark.sql.session.timeZone", "UTC"); spark.udf().register("bucket3", (Integer num) -> bucketTransform.apply(num), DataTypes.IntegerType); spark.udf().register("truncate5", (String str) -> truncateTransform.apply(str), DataTypes.StringType); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 4ea9fcef02ed..965c5e67a216 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -76,7 +76,10 @@ public TestSnapshotSelection(PlanScanAction.PlanMode planMode) { @BeforeClass public static void startSpark() { - TestSnapshotSelection.spark = SparkSession.builder().master("local[2]").getOrCreate(); + TestSnapshotSelection.spark = SparkSession + .builder() + .config(PlanScanAction.ICEBERG_TEST_PLAN_MODE, "true") + .master("local[2]").getOrCreate(); } @AfterClass diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 4b4184f0048d..8baf86a154e3 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -76,6 +76,7 @@ public static void startMetastoreAndSpark() { spark = SparkSession.builder() .master("local[2]") + .config(PlanScanAction.ICEBERG_TEST_PLAN_MODE, "true") .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) .enableHiveSupport() diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index ff55f6823ee9..9e7258d05704 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -411,6 +411,9 @@ private List planDistributedScan(DataTableScan scan) { try { result = Lists.newArrayList(Actions.forTable(table).planScan().withContext(scan.tableScanContext()).execute()); } catch (Exception e) { + if (SparkSession.active().conf().get(PlanScanAction.ICEBERG_TEST_PLAN_MODE).equals("true")) { + throw e; + } LOG.error("Cannot run distributed job planning, falling back to local planning.", e); result = planLocalScan(scan); } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 85094b697c89..8a2fc757350a 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -53,6 +53,7 @@ import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; @@ -307,6 +308,9 @@ private List planDistributedScan(DataTableScan scan) { try { result = Lists.newArrayList(Actions.forTable(table).planScan().withContext(scan.tableScanContext()).execute()); } catch (Exception e) { + if (SparkSession.active().conf().get(PlanScanAction.ICEBERG_TEST_PLAN_MODE).equals("true")) { + throw e; + } LOG.error("Cannot run distributed job planning, falling back to local planning.", e); result = planLocalScan(scan); }