diff --git a/api/src/main/java/com/netflix/iceberg/Files.java b/api/src/main/java/com/netflix/iceberg/Files.java
index b227199da829..f7397511e02f 100644
--- a/api/src/main/java/com/netflix/iceberg/Files.java
+++ b/api/src/main/java/com/netflix/iceberg/Files.java
@@ -75,6 +75,11 @@ public String location() {
return file.toString();
}
+ @Override
+ public InputFile toInputFile() {
+ return localInput(file);
+ }
+
@Override
public String toString() {
return location();
diff --git a/api/src/main/java/com/netflix/iceberg/ManifestFile.java b/api/src/main/java/com/netflix/iceberg/ManifestFile.java
new file mode 100644
index 000000000000..b1d919b08d06
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/ManifestFile.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.types.Types;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+/**
+ * Represents a manifest file that can be scanned to find data files in a table.
+ */
+public interface ManifestFile {
+ Schema SCHEMA = new Schema(
+ required(500, "manifest_path", Types.StringType.get()),
+ required(501, "manifest_length", Types.LongType.get()),
+ required(502, "partition_spec_id", Types.IntegerType.get()),
+ optional(503, "added_snapshot_id", Types.LongType.get()),
+ optional(504, "added_data_files_count", Types.IntegerType.get()),
+ optional(505, "existing_data_files_count", Types.IntegerType.get()),
+ optional(506, "deleted_data_files_count", Types.IntegerType.get()),
+ optional(507, "partitions", Types.ListType.ofRequired(508, Types.StructType.of(
+ required(509, "contains_null", Types.BooleanType.get()),
+ optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values
+ optional(511, "upper_bound", Types.BinaryType.get())
+ ))));
+
+ static Schema schema() {
+ return SCHEMA;
+ }
+
+ /**
+ * @return fully qualified path to the file, suitable for constructing a Hadoop Path
+ */
+ String path();
+
+ /**
+ * @return length of the manifest file
+ */
+ long length();
+
+ /**
+ * @return ID of the {@link PartitionSpec} used to write the manifest file
+ */
+ int partitionSpecId();
+
+ /**
+ * @return ID of the snapshot that added the manifest file to table metadata
+ */
+ Long snapshotId();
+
+ /**
+ * @return the number of data files with status ADDED in the manifest file
+ */
+ Integer addedFilesCount();
+
+ /**
+ * @return the number of data files with status EXISTING in the manifest file
+ */
+ Integer existingFilesCount();
+
+ /**
+ * @return the number of data files with status DELETED in the manifest file
+ */
+ Integer deletedFilesCount();
+
+ /**
+ * Returns a list of {@link PartitionFieldSummary partition field summaries}.
+ *
+ * Each summary corresponds to a field in the manifest file's partition spec, by ordinal. For
+ * example, the partition spec [ ts_day=date(ts), type=identity(type) ] will have 2 summaries.
+ * The first summary is for the ts_day partition field and the second is for the type partition
+ * field.
+ *
+ * @return a list of partition field summaries, one for each field in the manifest's spec
+ */
+ List partitions();
+
+ /**
+ * Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
+ * this method to make defensive copies.
+ *
+ * @return a copy of this manifest file
+ */
+ ManifestFile copy();
+
+ /**
+ * Summarizes the values of one partition field stored in a manifest file.
+ */
+ interface PartitionFieldSummary {
+ Types.StructType TYPE = ManifestFile.schema()
+ .findType("partitions")
+ .asListType()
+ .elementType()
+ .asStructType();
+
+ static Types.StructType getType() {
+ return TYPE;
+ }
+
+ /**
+ * @return true if at least one data file in the manifest has a null value for the field
+ */
+ boolean containsNull();
+
+ /**
+ * @return a ByteBuffer that contains a serialized bound lower than all values of the field
+ */
+ ByteBuffer lowerBound();
+
+ /**
+ * @return a ByteBuffer that contains a serialized bound higher than all values of the field
+ */
+ ByteBuffer upperBound();
+
+ /**
+ * Copies this {@link PartitionFieldSummary summary}. Readers can reuse instances; use this
+ * method to make defensive copies.
+ *
+ * @return a copy of this partition field summary
+ */
+ PartitionFieldSummary copy();
+ }
+}
diff --git a/api/src/main/java/com/netflix/iceberg/Snapshot.java b/api/src/main/java/com/netflix/iceberg/Snapshot.java
index 7fc878b32697..89542dc3e32d 100644
--- a/api/src/main/java/com/netflix/iceberg/Snapshot.java
+++ b/api/src/main/java/com/netflix/iceberg/Snapshot.java
@@ -60,7 +60,7 @@ public interface Snapshot {
*
* @return a list of fully-qualified manifest locations
*/
- List manifests();
+ List manifests();
/**
* Return all files added to the table in this snapshot.
@@ -81,4 +81,11 @@ public interface Snapshot {
* @return all files deleted from the table in this snapshot.
*/
Iterable deletedFiles();
+
+ /**
+ * Return the location of this snapshot's manifest list, or null if it is not separate.
+ *
+ * @return the location of the manifest list for this Snapshot
+ */
+ String manifestListLocation();
}
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java b/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
index 0106f35cfa32..5a836504bcad 100644
--- a/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
+++ b/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
@@ -55,6 +55,10 @@ public int fieldId() {
return fieldId;
}
+ public int pos() {
+ return pos;
+ }
+
public T get(StructLike struct) {
return struct.get(pos, javaType());
}
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java
new file mode 100644
index 000000000000..cac617d777a1
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.expressions;
+
+import com.netflix.iceberg.ManifestFile;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.PartitionSpec;
+import com.netflix.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
+import com.netflix.iceberg.types.Conversions;
+import com.netflix.iceberg.types.Types.StructType;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static com.netflix.iceberg.expressions.Expressions.rewriteNot;
+
+/**
+ * Evaluates an {@link Expression} on a {@link ManifestFile} to test whether the file contains
+ * matching partitions.
+ *
+ * This evaluation is inclusive: it returns true if a file may match and false if it cannot match.
+ *
+ * Files are passed to {@link #eval(ManifestFile)}, which returns true if the manifest may contain
+ * data files that match the partition expression. Manifest files may be skipped if and only if the
+ * return value of {@code eval} is false.
+ */
+public class InclusiveManifestEvaluator {
+ private final StructType struct;
+ private final Expression expr;
+ private transient ThreadLocal visitors = null;
+
+ private ManifestEvalVisitor visitor() {
+ if (visitors == null) {
+ this.visitors = ThreadLocal.withInitial(ManifestEvalVisitor::new);
+ }
+ return visitors.get();
+ }
+
+ public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter) {
+ this.struct = spec.partitionType();
+ this.expr = Binder.bind(struct, rewriteNot(Projections.inclusive(spec).project(rowFilter)));
+ }
+
+ /**
+ * Test whether the file may contain records that match the expression.
+ *
+ * @param manifest a manifest file
+ * @return false if the file cannot contain rows that match the expression, true otherwise.
+ */
+ public boolean eval(ManifestFile manifest) {
+ return visitor().eval(manifest);
+ }
+
+ private static final boolean ROWS_MIGHT_MATCH = true;
+ private static final boolean ROWS_CANNOT_MATCH = false;
+
+ private class ManifestEvalVisitor extends BoundExpressionVisitor {
+ private List stats = null;
+
+ private boolean eval(ManifestFile manifest) {
+ this.stats = manifest.partitions();
+ if (stats == null) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ return ExpressionVisitors.visit(expr, this);
+ }
+
+ @Override
+ public Boolean alwaysTrue() {
+ return ROWS_MIGHT_MATCH; // all rows match
+ }
+
+ @Override
+ public Boolean alwaysFalse() {
+ return ROWS_CANNOT_MATCH; // all rows fail
+ }
+
+ @Override
+ public Boolean not(Boolean result) {
+ return !result;
+ }
+
+ @Override
+ public Boolean and(Boolean leftResult, Boolean rightResult) {
+ return leftResult && rightResult;
+ }
+
+ @Override
+ public Boolean or(Boolean leftResult, Boolean rightResult) {
+ return leftResult || rightResult;
+ }
+
+ @Override
+ public Boolean isNull(BoundReference ref) {
+ // no need to check whether the field is required because binding evaluates that case
+ // if the column has no null values, the expression cannot match
+ if (!stats.get(ref.pos()).containsNull()) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean notNull(BoundReference ref) {
+ // containsNull encodes whether at least one partition value is null, lowerBound is null if
+ // all partition values are null.
+ ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+ if (lowerBound == null) {
+ return ROWS_CANNOT_MATCH; // all values are null
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean lt(BoundReference ref, Literal lit) {
+ ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+ if (lowerBound == null) {
+ return ROWS_CANNOT_MATCH; // values are all null
+ }
+
+ T lower = Conversions.fromByteBuffer(ref.type(), lowerBound);
+
+ int cmp = lit.comparator().compare(lower, lit.value());
+ if (cmp >= 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean ltEq(BoundReference ref, Literal lit) {
+ ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+ if (lowerBound == null) {
+ return ROWS_CANNOT_MATCH; // values are all null
+ }
+
+ T lower = Conversions.fromByteBuffer(ref.type(), lowerBound);
+
+ int cmp = lit.comparator().compare(lower, lit.value());
+ if (cmp > 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean gt(BoundReference ref, Literal lit) {
+ ByteBuffer upperBound = stats.get(ref.pos()).upperBound();
+ if (upperBound == null) {
+ return ROWS_CANNOT_MATCH; // values are all null
+ }
+
+ T upper = Conversions.fromByteBuffer(ref.type(), upperBound);
+
+ int cmp = lit.comparator().compare(upper, lit.value());
+ if (cmp <= 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean gtEq(BoundReference ref, Literal lit) {
+ ByteBuffer upperBound = stats.get(ref.pos()).upperBound();
+ if (upperBound == null) {
+ return ROWS_CANNOT_MATCH; // values are all null
+ }
+
+ T upper = Conversions.fromByteBuffer(ref.type(), upperBound);
+
+ int cmp = lit.comparator().compare(upper, lit.value());
+ if (cmp < 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean eq(BoundReference ref, Literal lit) {
+ PartitionFieldSummary fieldStats = stats.get(ref.pos());
+ if (fieldStats.lowerBound() == null) {
+ return ROWS_CANNOT_MATCH; // values are all null and literal cannot contain null
+ }
+
+ T lower = Conversions.fromByteBuffer(ref.type(), fieldStats.lowerBound());
+ int cmp = lit.comparator().compare(lower, lit.value());
+ if (cmp > 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ T upper = Conversions.fromByteBuffer(ref.type(), fieldStats.upperBound());
+ cmp = lit.comparator().compare(upper, lit.value());
+ if (cmp < 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean notEq(BoundReference ref, Literal lit) {
+ // because the bounds are not necessarily a min or max value, this cannot be answered using
+ // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean in(BoundReference ref, Literal lit) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean notIn(BoundReference ref, Literal lit) {
+ return ROWS_MIGHT_MATCH;
+ }
+ }
+}
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/Literals.java b/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
index 22ef41cdfafc..f4e5d4e74446 100644
--- a/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
+++ b/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
@@ -96,6 +96,7 @@ private abstract static class BaseLiteral implements Literal {
private final T value;
BaseLiteral(T value) {
+ Preconditions.checkNotNull(value, "Literal values cannot be null");
this.value = value;
}
diff --git a/api/src/main/java/com/netflix/iceberg/io/OutputFile.java b/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
index 9d7580568f82..f0f48ee7f73b 100644
--- a/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
+++ b/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
@@ -58,4 +58,10 @@ public interface OutputFile {
*/
String location();
+ /**
+ * Return an {@link InputFile} for the location of this output file.
+ *
+ * @return an input file for the location of this output file
+ */
+ InputFile toInputFile();
}
diff --git a/api/src/main/java/com/netflix/iceberg/types/Comparators.java b/api/src/main/java/com/netflix/iceberg/types/Comparators.java
index 9e2ce2d9edee..6680f7dd16ff 100644
--- a/api/src/main/java/com/netflix/iceberg/types/Comparators.java
+++ b/api/src/main/java/com/netflix/iceberg/types/Comparators.java
@@ -19,10 +19,41 @@
package com.netflix.iceberg.types;
+import com.google.common.collect.ImmutableMap;
import java.nio.ByteBuffer;
import java.util.Comparator;
public class Comparators {
+ private static final ImmutableMap> COMPARATORS = ImmutableMap
+ .>builder()
+ .put(Types.BooleanType.get(), Comparator.naturalOrder())
+ .put(Types.IntegerType.get(), Comparator.naturalOrder())
+ .put(Types.LongType.get(), Comparator.naturalOrder())
+ .put(Types.FloatType.get(), Comparator.naturalOrder())
+ .put(Types.DoubleType.get(), Comparator.naturalOrder())
+ .put(Types.DateType.get(), Comparator.naturalOrder())
+ .put(Types.TimeType.get(), Comparator.naturalOrder())
+ .put(Types.TimestampType.withZone(), Comparator.naturalOrder())
+ .put(Types.TimestampType.withoutZone(), Comparator.naturalOrder())
+ .put(Types.StringType.get(), Comparators.charSequences())
+ .put(Types.UUIDType.get(), Comparator.naturalOrder())
+ .put(Types.BinaryType.get(), Comparators.unsignedBytes())
+ .build();
+
+ @SuppressWarnings("unchecked")
+ public static Comparator forType(Type.PrimitiveType type) {
+ Comparator> cmp = COMPARATORS.get(type);
+ if (cmp != null) {
+ return (Comparator) cmp;
+ } else if (type instanceof Types.FixedType) {
+ return (Comparator) Comparators.unsignedBytes();
+ } else if (type instanceof Types.DecimalType) {
+ return (Comparator) Comparator.naturalOrder();
+ }
+
+ throw new UnsupportedOperationException("Cannot determine comparator for type: " + type);
+ }
+
public static Comparator unsignedBytes() {
return UnsignedByteBufComparator.INSTANCE;
}
diff --git a/api/src/test/java/com/netflix/iceberg/TestHelpers.java b/api/src/test/java/com/netflix/iceberg/TestHelpers.java
index 118e11b33d6b..ceb1eed280cd 100644
--- a/api/src/test/java/com/netflix/iceberg/TestHelpers.java
+++ b/api/src/test/java/com/netflix/iceberg/TestHelpers.java
@@ -176,6 +176,107 @@ private static void handleException(String message,
}
}
+ public static class TestFieldSummary implements ManifestFile.PartitionFieldSummary {
+ private final boolean containsNull;
+ private final ByteBuffer lowerBound;
+ private final ByteBuffer upperBound;
+
+ public TestFieldSummary(boolean containsNull, ByteBuffer lowerBound, ByteBuffer upperBound) {
+ this.containsNull = containsNull;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ }
+
+ @Override
+ public boolean containsNull() {
+ return containsNull;
+ }
+
+ @Override
+ public ByteBuffer lowerBound() {
+ return lowerBound;
+ }
+
+ @Override
+ public ByteBuffer upperBound() {
+ return upperBound;
+ }
+
+ @Override
+ public ManifestFile.PartitionFieldSummary copy() {
+ return this;
+ }
+ }
+
+ public static class TestManifestFile implements ManifestFile {
+ private final String path;
+ private final long length;
+ private final int specId;
+ private final Long snapshotId;
+ private final Integer addedFiles;
+ private final Integer existingFiles;
+ private final Integer deletedFiles;
+ private final List partitions;
+
+ public TestManifestFile(String path, long length, int specId, Long snapshotId,
+ Integer addedFiles, Integer existingFiles, Integer deletedFiles,
+ List partitions) {
+ this.path = path;
+ this.length = length;
+ this.specId = specId;
+ this.snapshotId = snapshotId;
+ this.addedFiles = addedFiles;
+ this.existingFiles = existingFiles;
+ this.deletedFiles = deletedFiles;
+ this.partitions = partitions;
+ }
+
+ @Override
+ public String path() {
+ return path;
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
+
+ @Override
+ public int partitionSpecId() {
+ return specId;
+ }
+
+ @Override
+ public Long snapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public Integer addedFilesCount() {
+ return addedFiles;
+ }
+
+ @Override
+ public Integer existingFilesCount() {
+ return existingFiles;
+ }
+
+ @Override
+ public Integer deletedFilesCount() {
+ return deletedFiles;
+ }
+
+ @Override
+ public List partitions() {
+ return partitions;
+ }
+
+ @Override
+ public ManifestFile copy() {
+ return this;
+ }
+ }
+
public static class TestDataFile implements DataFile {
private final String path;
private final StructLike partition;
diff --git a/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java b/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
index aab66a3120b2..1253f1e91017 100644
--- a/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
+++ b/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
@@ -34,6 +34,7 @@ public class TestPartitionPaths {
);
@Test
+ @SuppressWarnings("unchecked")
public void testPartitionPath() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
.hour("ts")
diff --git a/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java
new file mode 100644
index 000000000000..f92f70014fe1
--- /dev/null
+++ b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.expressions;
+
+import com.google.common.collect.ImmutableList;
+import com.netflix.iceberg.ManifestFile;
+import com.netflix.iceberg.PartitionSpec;
+import com.netflix.iceberg.Schema;
+import com.netflix.iceberg.TestHelpers;
+import com.netflix.iceberg.exceptions.ValidationException;
+import com.netflix.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static com.netflix.iceberg.expressions.Expressions.and;
+import static com.netflix.iceberg.expressions.Expressions.equal;
+import static com.netflix.iceberg.expressions.Expressions.greaterThan;
+import static com.netflix.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static com.netflix.iceberg.expressions.Expressions.isNull;
+import static com.netflix.iceberg.expressions.Expressions.lessThan;
+import static com.netflix.iceberg.expressions.Expressions.lessThanOrEqual;
+import static com.netflix.iceberg.expressions.Expressions.not;
+import static com.netflix.iceberg.expressions.Expressions.notEqual;
+import static com.netflix.iceberg.expressions.Expressions.notNull;
+import static com.netflix.iceberg.expressions.Expressions.or;
+import static com.netflix.iceberg.types.Conversions.toByteBuffer;
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+public class TestInclusiveManifestEvaluator {
+ private static final Schema SCHEMA = new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ optional(4, "all_nulls", Types.StringType.get()),
+ optional(5, "some_nulls", Types.StringType.get()),
+ optional(6, "no_nulls", Types.StringType.get())
+ );
+
+ private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+ .withSpecId(0)
+ .identity("id")
+ .identity("all_nulls")
+ .identity("some_nulls")
+ .identity("no_nulls")
+ .build();
+
+ private static final ByteBuffer INT_MIN = toByteBuffer(Types.IntegerType.get(), 30);
+ private static final ByteBuffer INT_MAX = toByteBuffer(Types.IntegerType.get(), 79);
+
+ private static final ByteBuffer STRING_MIN = toByteBuffer(Types.StringType.get(), "a");
+ private static final ByteBuffer STRING_MAX = toByteBuffer(Types.StringType.get(), "z");
+
+ private static final ManifestFile NO_STATS = new TestHelpers.TestManifestFile(
+ "manifest-list.avro", 1024, 0, System.currentTimeMillis(), null, null, null, null);
+
+ private static final ManifestFile FILE = new TestHelpers.TestManifestFile("manifest-list.avro",
+ 1024, 0, System.currentTimeMillis(), 5, 10, 0, ImmutableList.of(
+ new TestHelpers.TestFieldSummary(false, INT_MIN, INT_MAX),
+ new TestHelpers.TestFieldSummary(true, null, null),
+ new TestHelpers.TestFieldSummary(true, STRING_MIN, STRING_MAX),
+ new TestHelpers.TestFieldSummary(false, STRING_MIN, STRING_MAX)));
+
+ @Test
+ public void testAllNulls() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("all_nulls")).eval(FILE);
+ Assert.assertFalse("Should skip: no non-null value in all null column", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("some_nulls")).eval(FILE);
+ Assert.assertTrue("Should read: column with some nulls contains a non-null value", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("no_nulls")).eval(FILE);
+ Assert.assertTrue("Should read: non-null column contains a non-null value", shouldRead);
+ }
+
+ @Test
+ public void testNoNulls() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("all_nulls")).eval(FILE);
+ Assert.assertTrue("Should read: at least one null value in all null column", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("some_nulls")).eval(FILE);
+ Assert.assertTrue("Should read: column with some nulls contains a null value", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("no_nulls")).eval(FILE);
+ Assert.assertFalse("Should skip: non-null column contains no null values", shouldRead);
+ }
+
+ @Test
+ public void testMissingColumn() {
+ TestHelpers.assertThrows("Should complain about missing column in expression",
+ ValidationException.class, "Cannot find field 'missing'",
+ () -> new InclusiveManifestEvaluator(SPEC, lessThan("missing", 5)).eval(FILE));
+ }
+
+ @Test
+ public void testMissingStats() {
+ Expression[] exprs = new Expression[] {
+ lessThan("id", 5), lessThanOrEqual("id", 30), equal("id", 70),
+ greaterThan("id", 78), greaterThanOrEqual("id", 90), notEqual("id", 101),
+ isNull("id"), notNull("id")
+ };
+
+ for (Expression expr : exprs) {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, expr).eval(NO_STATS);
+ Assert.assertTrue("Should read when missing stats for expr: " + expr, shouldRead);
+ }
+ }
+
+ @Test
+ public void testNot() {
+ // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(lessThan("id", 5))).eval(FILE);
+ Assert.assertTrue("Should read: not(false)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(greaterThan("id", 5))).eval(FILE);
+ Assert.assertFalse("Should skip: not(true)", shouldRead);
+ }
+
+ @Test
+ public void testAnd() {
+ // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+ boolean shouldRead = new InclusiveManifestEvaluator(
+ SPEC, and(lessThan("id", 5), greaterThanOrEqual("id", 0))).eval(FILE);
+ Assert.assertFalse("Should skip: and(false, false)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(
+ SPEC, and(greaterThan("id", 5), lessThanOrEqual("id", 30))).eval(FILE);
+ Assert.assertTrue("Should read: and(true, true)", shouldRead);
+ }
+
+ @Test
+ public void testOr() {
+ // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+ boolean shouldRead = new InclusiveManifestEvaluator(
+ SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 80))).eval(FILE);
+ Assert.assertFalse("Should skip: or(false, false)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(
+ SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 60))).eval(FILE);
+ Assert.assertTrue("Should read: or(false, true)", shouldRead);
+ }
+
+ @Test
+ public void testIntegerLt() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 5)).eval(FILE);
+ Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 30)).eval(FILE);
+ Assert.assertFalse("Should not read: id range below lower bound (30 is not < 30)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 31)).eval(FILE);
+ Assert.assertTrue("Should read: one possible id", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 79)).eval(FILE);
+ Assert.assertTrue("Should read: may possible ids", shouldRead);
+ }
+
+ @Test
+ public void testIntegerLtEq() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 5)).eval(FILE);
+ Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 29)).eval(FILE);
+ Assert.assertFalse("Should not read: id range below lower bound (29 < 30)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 30)).eval(FILE);
+ Assert.assertTrue("Should read: one possible id", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 79)).eval(FILE);
+ Assert.assertTrue("Should read: many possible ids", shouldRead);
+ }
+
+ @Test
+ public void testIntegerGt() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 85)).eval(FILE);
+ Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 79)).eval(FILE);
+ Assert.assertFalse("Should not read: id range above upper bound (79 is not > 79)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 78)).eval(FILE);
+ Assert.assertTrue("Should read: one possible id", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 75)).eval(FILE);
+ Assert.assertTrue("Should read: may possible ids", shouldRead);
+ }
+
+ @Test
+ public void testIntegerGtEq() {
+ boolean shouldRead = new InclusiveManifestEvaluator(
+ SPEC, greaterThanOrEqual("id", 85)).eval(FILE);
+ Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(
+ SPEC, greaterThanOrEqual("id", 80)).eval(FILE);
+ Assert.assertFalse("Should not read: id range above upper bound (80 > 79)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(
+ SPEC, greaterThanOrEqual("id", 79)).eval(FILE);
+ Assert.assertTrue("Should read: one possible id", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(
+ SPEC, greaterThanOrEqual("id", 75)).eval(FILE);
+ Assert.assertTrue("Should read: may possible ids", shouldRead);
+ }
+
+ @Test
+ public void testIntegerEq() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 5)).eval(FILE);
+ Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 29)).eval(FILE);
+ Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 30)).eval(FILE);
+ Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 75)).eval(FILE);
+ Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 79)).eval(FILE);
+ Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 80)).eval(FILE);
+ Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 85)).eval(FILE);
+ Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+ }
+
+ @Test
+ public void testIntegerNotEq() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 5)).eval(FILE);
+ Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 29)).eval(FILE);
+ Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 30)).eval(FILE);
+ Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 75)).eval(FILE);
+ Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 79)).eval(FILE);
+ Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 80)).eval(FILE);
+ Assert.assertTrue("Should read: id above upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 85)).eval(FILE);
+ Assert.assertTrue("Should read: id above upper bound", shouldRead);
+ }
+
+ @Test
+ public void testIntegerNotEqRewritten() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 5))).eval(FILE);
+ Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 29))).eval(FILE);
+ Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 30))).eval(FILE);
+ Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 75))).eval(FILE);
+ Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 79))).eval(FILE);
+ Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 80))).eval(FILE);
+ Assert.assertTrue("Should read: id above upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 85))).eval(FILE);
+ Assert.assertTrue("Should read: id above upper bound", shouldRead);
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
index 5409b9a7ebe0..945ddbbb59b6 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
@@ -20,21 +20,25 @@
package com.netflix.iceberg;
import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.netflix.iceberg.avro.Avro;
import com.netflix.iceberg.exceptions.RuntimeIOException;
-import com.netflix.iceberg.io.CloseableGroup;
+import com.netflix.iceberg.io.CloseableIterable;
+import com.netflix.iceberg.io.InputFile;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-class BaseSnapshot extends CloseableGroup implements Snapshot {
+class BaseSnapshot implements Snapshot {
private final TableOperations ops;
private final long snapshotId;
private final Long parentId;
private final long timestampMillis;
- private final List manifestFiles;
+ private final InputFile manifestList;
// lazily initialized
+ private List manifests = null;
private List adds = null;
private List deletes = null;
@@ -44,19 +48,30 @@ class BaseSnapshot extends CloseableGroup implements Snapshot {
BaseSnapshot(TableOperations ops,
long snapshotId,
String... manifestFiles) {
- this(ops, snapshotId, null, System.currentTimeMillis(), Arrays.asList(manifestFiles));
+ this(ops, snapshotId, null, System.currentTimeMillis(),
+ Lists.transform(Arrays.asList(manifestFiles),
+ path -> new GenericManifestFile(ops.newInputFile(path), 0)));
}
BaseSnapshot(TableOperations ops,
long snapshotId,
Long parentId,
long timestampMillis,
- List manifestFiles) {
+ InputFile manifestList) {
this.ops = ops;
this.snapshotId = snapshotId;
this.parentId = parentId;
this.timestampMillis = timestampMillis;
- this.manifestFiles = manifestFiles;
+ this.manifestList = manifestList;
+ }
+
+ BaseSnapshot(TableOperations ops,
+ long snapshotId,
+ Long parentId,
+ long timestampMillis,
+ List manifests) {
+ this(ops, snapshotId, parentId, timestampMillis, (InputFile) null);
+ this.manifests = manifests;
}
@Override
@@ -75,8 +90,25 @@ public long timestampMillis() {
}
@Override
- public List manifests() {
- return manifestFiles;
+ public List manifests() {
+ if (manifests == null) {
+ // if manifests isn't set, then the snapshotFile is set and should be read to get the list
+ try (CloseableIterable files = Avro.read(manifestList)
+ .rename("manifest_file", GenericManifestFile.class.getName())
+ .rename("partitions", GenericPartitionFieldSummary.class.getName())
+ .rename("r508", GenericPartitionFieldSummary.class.getName())
+ .project(ManifestFile.schema())
+ .reuseContainers(false)
+ .build()) {
+
+ this.manifests = Lists.newLinkedList(files);
+
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Cannot read snapshot file: %s", manifestList.location());
+ }
+ }
+
+ return manifests;
}
@Override
@@ -95,13 +127,18 @@ public List deletedFiles() {
return deletes;
}
+ @Override
+ public String manifestListLocation() {
+ return manifestList != null ? manifestList.location() : null;
+ }
+
private void cacheChanges() {
List adds = Lists.newArrayList();
List deletes = Lists.newArrayList();
// accumulate adds and deletes from all manifests.
// because manifests can be reused in newer snapshots, filter the changes by snapshot id.
- for (String manifest : manifestFiles) {
+ for (String manifest : Iterables.transform(manifests, ManifestFile::path)) {
try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
for (ManifestEntry add : reader.addedFiles()) {
if (add.snapshotId() == snapshotId) {
@@ -127,7 +164,7 @@ public String toString() {
return Objects.toStringHelper(this)
.add("id", snapshotId)
.add("timestamp_ms", timestampMillis)
- .add("manifests", manifestFiles)
+ .add("manifests", manifests)
.toString();
}
}
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
index e99889e959d9..ad207807c6e0 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
@@ -21,6 +21,9 @@
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -30,6 +33,8 @@
import com.netflix.iceberg.expressions.Binder;
import com.netflix.iceberg.expressions.Expression;
import com.netflix.iceberg.expressions.Expressions;
+import com.netflix.iceberg.expressions.InclusiveManifestEvaluator;
+import com.netflix.iceberg.expressions.Projections;
import com.netflix.iceberg.expressions.ResidualEvaluator;
import com.netflix.iceberg.io.CloseableIterable;
import com.netflix.iceberg.types.TypeUtil;
@@ -141,6 +146,16 @@ public TableScan filter(Expression expr) {
return new BaseTableScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr));
}
+ private final LoadingCache EVAL_CACHE = CacheBuilder
+ .newBuilder()
+ .build(new CacheLoader() {
+ @Override
+ public InclusiveManifestEvaluator load(Integer specId) {
+ PartitionSpec spec = ops.current().spec(specId);
+ return new InclusiveManifestEvaluator(spec, rowFilter);
+ }
+ });
+
@Override
public CloseableIterable planFiles() {
Snapshot snapshot = snapshotId != null ?
@@ -155,11 +170,14 @@ public CloseableIterable planFiles() {
Listeners.notifyAll(
new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema));
+ Iterable matchingManifests = Iterables.filter(snapshot.manifests(),
+ manifest -> EVAL_CACHE.getUnchecked(manifest.partitionSpecId()).eval(manifest));
+
ConcurrentLinkedQueue toClose = new ConcurrentLinkedQueue<>();
Iterable> readers = Iterables.transform(
- snapshot.manifests(),
+ matchingManifests,
manifest -> {
- ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest));
+ ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
toClose.add(reader);
String schemaString = SchemaParser.toJson(reader.spec().schema());
String specString = PartitionSpecParser.toJson(reader.spec());
diff --git a/core/src/main/java/com/netflix/iceberg/FastAppend.java b/core/src/main/java/com/netflix/iceberg/FastAppend.java
index de34545a018f..278f05936423 100644
--- a/core/src/main/java/com/netflix/iceberg/FastAppend.java
+++ b/core/src/main/java/com/netflix/iceberg/FastAppend.java
@@ -35,7 +35,7 @@
class FastAppend extends SnapshotUpdate implements AppendFiles {
private final PartitionSpec spec;
private final List newFiles = Lists.newArrayList();
- private String newManifestLocation = null;
+ private ManifestFile newManifest = null;
private boolean hasNewFiles = false;
FastAppend(TableOperations ops) {
@@ -51,11 +51,15 @@ public FastAppend appendFile(DataFile file) {
}
@Override
- public List apply(TableMetadata base) {
- String location = writeManifest();
+ public List apply(TableMetadata base) {
+ List newManifests = Lists.newArrayList();
+
+ try {
+ newManifests.add(writeManifest());
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to write manifest");
+ }
- List newManifests = Lists.newArrayList();
- newManifests.add(location);
if (base.currentSnapshot() != null) {
newManifests.addAll(base.currentSnapshot().manifests());
}
@@ -64,33 +68,32 @@ public List apply(TableMetadata base) {
}
@Override
- protected void cleanUncommitted(Set committed) {
- if (!committed.contains(newManifestLocation)) {
- deleteFile(newManifestLocation);
+ protected void cleanUncommitted(Set committed) {
+ if (!committed.contains(newManifest)) {
+ deleteFile(newManifest.path());
}
}
- private String writeManifest() {
- if (hasNewFiles && newManifestLocation != null) {
- deleteFile(newManifestLocation);
- hasNewFiles = false;
- newManifestLocation = null;
+ private ManifestFile writeManifest() throws IOException {
+ if (hasNewFiles && newManifest != null) {
+ deleteFile(newManifest.path());
+ newManifest = null;
}
- if (newManifestLocation == null) {
+ if (newManifest == null) {
OutputFile out = manifestPath(0);
- try (ManifestWriter writer = new ManifestWriter(spec, out, snapshotId())) {
-
+ ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+ try {
writer.addAll(newFiles);
-
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to write manifest: %s", out);
+ } finally {
+ writer.close();
}
- this.newManifestLocation = out.location();
+ this.newManifest = writer.toManifestFile();
+ hasNewFiles = false;
}
- return newManifestLocation;
+ return newManifest;
}
}
diff --git a/core/src/main/java/com/netflix/iceberg/FileHistory.java b/core/src/main/java/com/netflix/iceberg/FileHistory.java
index 3ed800626637..60146b0c8b9a 100644
--- a/core/src/main/java/com/netflix/iceberg/FileHistory.java
+++ b/core/src/main/java/com/netflix/iceberg/FileHistory.java
@@ -32,6 +32,9 @@
import java.util.List;
import java.util.Set;
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.transform;
+
public class FileHistory {
private static final List HISTORY_COLUMNS = ImmutableList.of("file_path");
@@ -91,12 +94,17 @@ public Iterable build() {
snapshots = Iterables.filter(snapshots, snap -> snap.timestampMillis() <= endTime);
}
+ // only use manifests that were added in the matching snapshots
+ Set matchingIds = Sets.newHashSet(transform(snapshots, snap -> snap.snapshotId()));
+ Iterable manifests = Iterables.filter(
+ concat(transform(snapshots, Snapshot::manifests)),
+ manifest -> manifest.snapshotId() == null || matchingIds.contains(manifest.snapshotId()));
+
// a manifest group will only read each manifest once
- ManifestGroup manifests = new ManifestGroup(((HasTableOperations) table).operations(),
- Iterables.concat(Iterables.transform(snapshots, Snapshot::manifests)));
+ ManifestGroup group = new ManifestGroup(((HasTableOperations) table).operations(), manifests);
List results = Lists.newArrayList();
- try (CloseableIterable entries = manifests.select(HISTORY_COLUMNS).entries()) {
+ try (CloseableIterable entries = group.select(HISTORY_COLUMNS).entries()) {
// TODO: replace this with an IN predicate
CharSequenceWrapper locationWrapper = CharSequenceWrapper.wrap(null);
for (ManifestEntry entry : entries) {
diff --git a/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java b/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java
new file mode 100644
index 000000000000..628515bc98f7
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.base.Objects;
+import com.netflix.iceberg.avro.AvroSchemaUtil;
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.types.Types;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData.SchemaConstructable;
+import java.io.Serializable;
+import java.util.List;
+
+import static com.google.common.collect.ImmutableList.copyOf;
+import static com.google.common.collect.Iterables.transform;
+
+public class GenericManifestFile
+ implements ManifestFile, StructLike, IndexedRecord, SchemaConstructable, Serializable {
+ private static final Schema AVRO_SCHEMA = AvroSchemaUtil.convert(
+ ManifestFile.schema(), "manifest_file");
+
+ private transient Schema avroSchema; // not final for Java serialization
+ private int[] fromProjectionPos;
+
+ // data fields
+ private InputFile file = null;
+ private String manifestPath = null;
+ private Long length = null;
+ private int specId = -1;
+ private Long snapshotId = null;
+ private Integer addedFilesCount = null;
+ private Integer existingFilesCount = null;
+ private Integer deletedFilesCount = null;
+ private List partitions = null;
+
+ /**
+ * Used by Avro reflection to instantiate this class when reading manifest files.
+ */
+ public GenericManifestFile(org.apache.avro.Schema avroSchema) {
+ this.avroSchema = avroSchema;
+
+ List fields = AvroSchemaUtil.convert(avroSchema)
+ .asNestedType()
+ .asStructType()
+ .fields();
+ List allFields = ManifestFile.schema().asStruct().fields();
+
+ this.fromProjectionPos = new int[fields.size()];
+ for (int i = 0; i < fromProjectionPos.length; i += 1) {
+ boolean found = false;
+ for (int j = 0; j < allFields.size(); j += 1) {
+ if (fields.get(i).fieldId() == allFields.get(j).fieldId()) {
+ found = true;
+ fromProjectionPos[i] = j;
+ }
+ }
+
+ if (!found) {
+ throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i));
+ }
+ }
+ }
+
+ GenericManifestFile(InputFile file, int specId) {
+ this.avroSchema = AVRO_SCHEMA;
+ this.file = file;
+ this.manifestPath = file.location();
+ this.length = null; // lazily loaded from file
+ this.specId = specId;
+ this.snapshotId = null;
+ this.addedFilesCount = null;
+ this.existingFilesCount = null;
+ this.deletedFilesCount = null;
+ this.partitions = null;
+ this.fromProjectionPos = null;
+ }
+
+ public GenericManifestFile(String path, long length, int specId, long snapshotId,
+ int addedFilesCount, int existingFilesCount, int deletedFilesCount,
+ List partitions) {
+ this.avroSchema = AVRO_SCHEMA;
+ this.manifestPath = path;
+ this.length = length;
+ this.specId = specId;
+ this.snapshotId = snapshotId;
+ this.addedFilesCount = addedFilesCount;
+ this.existingFilesCount = existingFilesCount;
+ this.deletedFilesCount = deletedFilesCount;
+ this.partitions = partitions;
+ this.fromProjectionPos = null;
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param toCopy a generic manifest file to copy.
+ */
+ private GenericManifestFile(GenericManifestFile toCopy) {
+ this.avroSchema = toCopy.avroSchema;
+ this.manifestPath = toCopy.manifestPath;
+ this.length = toCopy.length;
+ this.specId = toCopy.specId;
+ this.snapshotId = toCopy.snapshotId;
+ this.addedFilesCount = toCopy.addedFilesCount;
+ this.existingFilesCount = toCopy.existingFilesCount;
+ this.deletedFilesCount = toCopy.deletedFilesCount;
+ this.partitions = copyOf(transform(toCopy.partitions, PartitionFieldSummary::copy));
+ this.fromProjectionPos = toCopy.fromProjectionPos;
+ }
+
+ /**
+ * Constructor for Java serialization.
+ */
+ GenericManifestFile() {
+ }
+
+ @Override
+ public String path() {
+ return manifestPath;
+ }
+
+ public Long lazyLength() {
+ if (length == null) {
+ if (file != null) {
+ // this was created from an input file and length is lazily loaded
+ this.length = file.getLength();
+ } else {
+ // this was loaded from a file without projecting length, throw an exception
+ return null;
+ }
+ }
+ return length;
+ }
+
+ @Override
+ public long length() {
+ return lazyLength();
+ }
+
+ @Override
+ public int partitionSpecId() {
+ return specId;
+ }
+
+ @Override
+ public Long snapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public Integer addedFilesCount() {
+ return addedFilesCount;
+ }
+
+ @Override
+ public Integer existingFilesCount() {
+ return existingFilesCount;
+ }
+
+ @Override
+ public Integer deletedFilesCount() {
+ return deletedFilesCount;
+ }
+
+ @Override
+ public List partitions() {
+ return partitions;
+ }
+
+ @Override
+ public int size() {
+ return ManifestFile.schema().columns().size();
+ }
+
+ @Override
+ public T get(int pos, Class javaClass) {
+ return javaClass.cast(get(pos));
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ set(i, v);
+ }
+
+ @Override
+ public Object get(int i) {
+ int pos = i;
+ // if the schema was projected, map the incoming ordinal to the expected one
+ if (fromProjectionPos != null) {
+ pos = fromProjectionPos[i];
+ }
+ switch (pos) {
+ case 0:
+ return manifestPath;
+ case 1:
+ return lazyLength();
+ case 2:
+ return specId;
+ case 3:
+ return snapshotId;
+ case 4:
+ return addedFilesCount;
+ case 5:
+ return existingFilesCount;
+ case 6:
+ return deletedFilesCount;
+ case 7:
+ return partitions;
+ default:
+ throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void set(int i, T value) {
+ int pos = i;
+ // if the schema was projected, map the incoming ordinal to the expected one
+ if (fromProjectionPos != null) {
+ pos = fromProjectionPos[i];
+ }
+ switch (pos) {
+ case 0:
+ // always coerce to String for Serializable
+ this.manifestPath = value.toString();
+ return;
+ case 1:
+ this.length = (Long) value;
+ return;
+ case 2:
+ this.specId = (Integer) value;
+ return;
+ case 3:
+ this.snapshotId = (Long) value;
+ return;
+ case 4:
+ this.addedFilesCount = (Integer) value;
+ return;
+ case 5:
+ this.existingFilesCount = (Integer) value;
+ return;
+ case 6:
+ this.deletedFilesCount = (Integer) value;
+ return;
+ case 7:
+ this.partitions = (List) value;
+ return;
+ default:
+ // ignore the object, it must be from a newer version of the format
+ }
+ }
+
+ @Override
+ public ManifestFile copy() {
+ return new GenericManifestFile(this);
+ }
+
+ @Override
+ public Schema getSchema() {
+ return avroSchema;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ GenericManifestFile that = (GenericManifestFile) other;
+ return Objects.equal(manifestPath, that.manifestPath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(manifestPath);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("path", manifestPath)
+ .add("length", length)
+ .add("partition_spec_id", specId)
+ .add("added_snapshot_id", snapshotId)
+ .add("added_data_files_count", addedFilesCount)
+ .add("existing_data_files_count", existingFilesCount)
+ .add("deleted_data_files_count", deletedFilesCount)
+ .add("partitions", partitions)
+ .toString();
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java b/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java
new file mode 100644
index 000000000000..0c57cb3cd7c4
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.base.Objects;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.avro.AvroSchemaUtil;
+import com.netflix.iceberg.types.Types;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData.SchemaConstructable;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class GenericPartitionFieldSummary
+ implements PartitionFieldSummary, StructLike, IndexedRecord, SchemaConstructable, Serializable {
+ private static final Schema AVRO_SCHEMA = AvroSchemaUtil.convert(PartitionFieldSummary.getType());
+
+ private transient Schema avroSchema; // not final for Java serialization
+ private int[] fromProjectionPos;
+
+ // data fields
+ private boolean containsNull = false;
+ private ByteBuffer lowerBound = null;
+ private ByteBuffer upperBound = null;
+
+ /**
+ * Used by Avro reflection to instantiate this class when reading manifest files.
+ */
+ public GenericPartitionFieldSummary(Schema avroSchema) {
+ this.avroSchema = avroSchema;
+
+ List fields = AvroSchemaUtil.convert(avroSchema)
+ .asNestedType()
+ .asStructType()
+ .fields();
+ List allFields = PartitionFieldSummary.getType().fields();
+
+ this.fromProjectionPos = new int[fields.size()];
+ for (int i = 0; i < fromProjectionPos.length; i += 1) {
+ boolean found = false;
+ for (int j = 0; j < allFields.size(); j += 1) {
+ if (fields.get(i).fieldId() == allFields.get(j).fieldId()) {
+ found = true;
+ fromProjectionPos[i] = j;
+ }
+ }
+
+ if (!found) {
+ throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i));
+ }
+ }
+ }
+
+ public GenericPartitionFieldSummary(boolean containsNull, ByteBuffer lowerBound,
+ ByteBuffer upperBound) {
+ this.avroSchema = AVRO_SCHEMA;
+ this.containsNull = containsNull;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ this.fromProjectionPos = null;
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param toCopy a generic manifest file to copy.
+ */
+ private GenericPartitionFieldSummary(GenericPartitionFieldSummary toCopy) {
+ this.avroSchema = toCopy.avroSchema;
+ this.containsNull = toCopy.containsNull;
+ this.lowerBound = toCopy.lowerBound;
+ this.upperBound = toCopy.upperBound;
+ this.fromProjectionPos = toCopy.fromProjectionPos;
+ }
+
+ /**
+ * Constructor for Java serialization.
+ */
+ GenericPartitionFieldSummary() {
+ }
+
+ @Override
+ public boolean containsNull() {
+ return containsNull;
+ }
+
+ @Override
+ public ByteBuffer lowerBound() {
+ return lowerBound;
+ }
+
+ @Override
+ public ByteBuffer upperBound() {
+ return upperBound;
+ }
+
+ @Override
+ public int size() {
+ return PartitionFieldSummary.getType().fields().size();
+ }
+
+ @Override
+ public T get(int pos, Class javaClass) {
+ return javaClass.cast(get(pos));
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ set(i, v);
+ }
+
+ @Override
+ public Object get(int i) {
+ int pos = i;
+ // if the schema was projected, map the incoming ordinal to the expected one
+ if (fromProjectionPos != null) {
+ pos = fromProjectionPos[i];
+ }
+ switch (pos) {
+ case 0:
+ return containsNull;
+ case 1:
+ return lowerBound;
+ case 2:
+ return upperBound;
+ default:
+ throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void set(int i, T value) {
+ int pos = i;
+ // if the schema was projected, map the incoming ordinal to the expected one
+ if (fromProjectionPos != null) {
+ pos = fromProjectionPos[i];
+ }
+ switch (pos) {
+ case 0:
+ this.containsNull = (Boolean) value;
+ return;
+ case 1:
+ this.lowerBound = (ByteBuffer) value;
+ return;
+ case 2:
+ this.upperBound = (ByteBuffer) value;
+ return;
+ default:
+ // ignore the object, it must be from a newer version of the format
+ }
+ }
+
+ @Override
+ public PartitionFieldSummary copy() {
+ return new GenericPartitionFieldSummary(this);
+ }
+
+ @Override
+ public Schema getSchema() {
+ return avroSchema;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("contains_null", containsNull)
+ .add("lower_bound", lowerBound)
+ .add("upper_bound", upperBound)
+ .toString();
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
index f343427db110..19d993f919a5 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
@@ -37,18 +37,18 @@ class ManifestGroup {
private static final Types.StructType EMPTY_STRUCT = Types.StructType.of();
private final TableOperations ops;
- private final Set manifests;
+ private final Set manifests;
private final Expression dataFilter;
private final Expression fileFilter;
private final boolean ignoreDeleted;
private final List columns;
- ManifestGroup(TableOperations ops, Iterable manifests) {
+ ManifestGroup(TableOperations ops, Iterable manifests) {
this(ops, Sets.newHashSet(manifests), Expressions.alwaysTrue(), Expressions.alwaysTrue(),
false, ImmutableList.of("*"));
}
- private ManifestGroup(TableOperations ops, Set manifests,
+ private ManifestGroup(TableOperations ops, Set manifests,
Expression dataFilter, Expression fileFilter, boolean ignoreDeleted,
List columns) {
this.ops = ops;
@@ -94,10 +94,20 @@ public CloseableIterable entries() {
Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter);
List toClose = Lists.newArrayList();
+ Iterable matchingManifests = manifests;
+
+ if (ignoreDeleted) {
+ // remove any manifests that don't have any existing or added files. if either the added or
+ // existing files count is missing, the manifest must be scanned.
+ matchingManifests = Iterables.filter(manifests, manifest ->
+ manifest.addedFilesCount() == null || manifest.existingFilesCount() == null ||
+ manifest.addedFilesCount() + manifest.existingFilesCount() > 0);
+ }
+
Iterable> readers = Iterables.transform(
- manifests,
+ matchingManifests,
manifest -> {
- ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest));
+ ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
toClose.add(reader);
return Iterables.filter(
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
new file mode 100644
index 000000000000..98cdbbfb8f58
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.ImmutableMap;
+import com.netflix.iceberg.avro.Avro;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.FileAppender;
+import com.netflix.iceberg.io.OutputFile;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+class ManifestListWriter implements FileAppender {
+ private final FileAppender writer;
+
+ ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
+ this.writer = newAppender(snapshotFile, ImmutableMap.of(
+ "snapshot-id", String.valueOf(snapshotId),
+ "parent-snapshot-id", String.valueOf(parentSnapshotId)));
+ }
+
+ @Override
+ public void add(ManifestFile file) {
+ writer.add(file);
+ }
+
+ @Override
+ public void addAll(Iterator values) {
+ writer.addAll(values);
+ }
+
+ @Override
+ public void addAll(Iterable values) {
+ writer.addAll(values);
+ }
+
+ @Override
+ public Metrics metrics() {
+ return writer.metrics();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ private static FileAppender newAppender(OutputFile file, Map meta) {
+ try {
+ return Avro.write(file)
+ .schema(ManifestFile.schema())
+ .named("manifest_file")
+ .meta(meta)
+ .build();
+
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: " + file);
+ }
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
index 28ba83158af6..a59c1009d2c9 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
@@ -19,6 +19,7 @@
package com.netflix.iceberg;
+import com.google.common.base.Preconditions;
import com.netflix.iceberg.avro.Avro;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.FileAppender;
@@ -35,14 +36,27 @@
class ManifestWriter implements FileAppender {
private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);
+ private final String location;
+ private final OutputFile file;
+ private final int specId;
private final FileAppender writer;
private final long snapshotId;
- private ManifestEntry reused = null;
+ private final ManifestEntry reused;
+ private final PartitionSummary stats;
+
+ private boolean closed = false;
+ private int addedFiles = 0;
+ private int existingFiles = 0;
+ private int deletedFiles = 0;
ManifestWriter(PartitionSpec spec, OutputFile file, long snapshotId) {
+ this.location = file.location();
+ this.file = file;
+ this.specId = spec.specId();
this.writer = newAppender(FileFormat.AVRO, spec, file);
this.snapshotId = snapshotId;
this.reused = new ManifestEntry(spec.partitionType());
+ this.stats = new PartitionSummary(spec);
}
public void addExisting(Iterable entries) {
@@ -54,25 +68,37 @@ public void addExisting(Iterable entries) {
}
public void addExisting(ManifestEntry entry) {
- writer.add(reused.wrapExisting(entry.snapshotId(), entry.file()));
+ add(reused.wrapExisting(entry.snapshotId(), entry.file()));
}
public void addExisting(long snapshotId, DataFile file) {
- writer.add(reused.wrapExisting(snapshotId, file));
+ add(reused.wrapExisting(snapshotId, file));
}
public void delete(ManifestEntry entry) {
// Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
// when this Snapshot has been removed or when there are no Snapshots older than this one.
- writer.add(reused.wrapDelete(snapshotId, entry.file()));
+ add(reused.wrapDelete(snapshotId, entry.file()));
}
public void delete(DataFile file) {
- writer.add(reused.wrapDelete(snapshotId, file));
+ add(reused.wrapDelete(snapshotId, file));
}
- public void add(ManifestEntry file) {
- writer.add(file);
+ public void add(ManifestEntry entry) {
+ switch (entry.status()) {
+ case ADDED:
+ addedFiles += 1;
+ break;
+ case EXISTING:
+ existingFiles += 1;
+ break;
+ case DELETED:
+ deletedFiles += 1;
+ break;
+ }
+ stats.update(entry.file().partition());
+ writer.add(entry);
}
public void addEntries(Iterable entries) {
@@ -85,7 +111,7 @@ public void addEntries(Iterable entries) {
public void add(DataFile file) {
// TODO: this assumes that file is a GenericDataFile that can be written directly to Avro
// Eventually, this should check in case there are other DataFile implementations.
- writer.add(reused.wrapAppend(snapshotId, file));
+ add(reused.wrapAppend(snapshotId, file));
}
@Override
@@ -93,8 +119,15 @@ public Metrics metrics() {
return writer.metrics();
}
+ public ManifestFile toManifestFile() {
+ Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed");
+ return new GenericManifestFile(location, file.toInputFile().getLength(), specId, snapshotId,
+ addedFiles, existingFiles, deletedFiles, stats.summaries());
+ }
+
@Override
public void close() throws IOException {
+ this.closed = true;
writer.close();
}
diff --git a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
index f12ba042711f..8878d4c0ec4b 100644
--- a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
@@ -25,7 +25,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.io.Closeables;
import com.netflix.iceberg.ManifestEntry.Status;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.exceptions.ValidationException;
@@ -34,7 +33,6 @@
import com.netflix.iceberg.expressions.Expressions;
import com.netflix.iceberg.expressions.Projections;
import com.netflix.iceberg.expressions.StrictMetricsEvaluator;
-import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import com.netflix.iceberg.util.BinPacking.ListPacker;
import com.netflix.iceberg.util.CharSequenceWrapper;
@@ -42,15 +40,12 @@
import com.netflix.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Array;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Iterables.filter;
@@ -64,7 +59,6 @@
abstract class MergingSnapshotUpdate extends SnapshotUpdate {
private final Logger LOG = LoggerFactory.getLogger(getClass());
- private static final long SIZE_PER_FILE = 100; // assume each file will be ~100 bytes
private static final Joiner COMMA = Joiner.on(",");
protected static class DeleteException extends ValidationException {
@@ -94,14 +88,18 @@ public String partition() {
private boolean failAnyDelete = false;
private boolean failMissingDeletePaths = false;
+ // cache the new manifest once it is written
+ private ManifestFile newManifest = null;
+ private boolean hasNewFiles = false;
+
// cache merge results to reuse when retrying
- private final Map, String> mergeManifests = Maps.newConcurrentMap();
+ private final Map, ManifestFile> mergeManifests = Maps.newConcurrentMap();
// cache filtered manifests to avoid extra work when commits fail.
- private final Map filteredManifests = Maps.newConcurrentMap();
+ private final Map filteredManifests = Maps.newConcurrentMap();
// tracking where files were deleted to validate retries quickly
- private final Map> filteredManifestToDeletedFiles =
+ private final Map> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();
private boolean filterUpdated = false; // used to clear caches of filtered and merged manifests
@@ -169,77 +167,70 @@ protected void delete(CharSequence path) {
* Add a file to the new snapshot.
*/
protected void add(DataFile file) {
+ hasNewFiles = true;
newFiles.add(file);
}
@Override
- public List apply(TableMetadata base) {
+ public List apply(TableMetadata base) {
if (filterUpdated) {
- cleanUncommittedFilters(EMPTY_SET);
+ cleanUncommittedFilters(SnapshotUpdate.EMPTY_SET);
this.filterUpdated = false;
}
Snapshot current = base.currentSnapshot();
- List specs = Lists.newArrayList();
- List> groups = Lists.newArrayList();
+ Map> groups = Maps.newTreeMap(Comparator.reverseOrder());
// use a common metrics evaluator for all manifests because it is bound to the table schema
StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(
ops.current().schema(), deleteExpression);
// add the current spec as the first group. files are added to the beginning.
- if (newFiles.size() > 0) {
- specs.add(spec);
- groups.add(Lists.newArrayList());
- groups.get(0).add(newFilesAsManifest());
- }
-
- ConcurrentLinkedQueue toClose = new ConcurrentLinkedQueue<>();
- boolean threw = true;
try {
+ if (newFiles.size() > 0) {
+ ManifestFile newManifest = newFilesAsManifest();
+ List manifestGroup = Lists.newArrayList();
+ manifestGroup.add(newManifest);
+ groups.put(newManifest.partitionSpecId(), manifestGroup);
+ }
+
Set deletedFiles = Sets.newHashSet();
// group manifests by compatible partition specs to be merged
if (current != null) {
- List manifests = current.manifests();
- ManifestReader[] readers = new ManifestReader[manifests.size()];
+ List manifests = current.manifests();
+ ManifestFile[] filtered = new ManifestFile[manifests.size()];
// open all of the manifest files in parallel, use index to avoid reordering
- Tasks.range(readers.length)
+ Tasks.range(filtered.length)
.stopOnFailure().throwFailureWhenFinished()
.executeWith(getWorkerPool())
.run(index -> {
- ManifestReader manifest = filterManifest(
- deleteExpression, metricsEvaluator, ops.newInputFile(manifests.get(index)));
- readers[index] = manifest;
- toClose.add(manifest);
- });
-
- for (ManifestReader reader : readers) {
- if (reader.file() != null) {
- String location = reader.file().location();
- Set manifestDeletes = filteredManifestToDeletedFiles.get(location);
- if (manifestDeletes != null) {
- deletedFiles.addAll(manifestDeletes);
- }
+ ManifestFile manifest = filterManifest(
+ deleteExpression, metricsEvaluator,
+ manifests.get(index));
+ filtered[index] = manifest;
+ }, IOException.class);
+
+ for (ManifestFile manifest : filtered) {
+ Set manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
+ if (manifestDeletes != null) {
+ deletedFiles.addAll(manifestDeletes);
}
- int index = findMatch(specs, reader.spec());
- if (index < 0) {
- // not found, add a new one
- List newList = Lists.newArrayList(reader);
- specs.add(reader.spec());
- groups.add(newList);
+ List group = groups.get(manifest.partitionSpecId());
+ if (group != null) {
+ group.add(manifest);
} else {
- // replace the reader spec with the later one
- specs.set(index, reader.spec());
- groups.get(index).add(reader);
+ group = Lists.newArrayList();
+ group.add(manifest);
+ groups.put(manifest.partitionSpecId(), group);
}
}
}
- List manifests = Lists.newArrayList();
- for (int i = 0; i < specs.size(); i += 1) {
- for (String manifest : mergeGroup(specs.get(i), groups.get(i))) {
+ List manifests = Lists.newArrayList();
+ for (Map.Entry> entry : groups.entrySet()) {
+ for (ManifestFile manifest : mergeGroup(entry.getKey(), entry.getValue())) {
manifests.add(manifest);
}
}
@@ -250,52 +241,56 @@ public List apply(TableMetadata base) {
path -> !deletedFiles.contains(path)),
CharSequenceWrapper::get)));
- threw = false;
-
return manifests;
- } finally {
- for (ManifestReader reader : toClose) {
- try {
- Closeables.close(reader, threw);
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
- }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to create snapshot manifest list");
}
}
- private void cleanUncommittedMerges(Set committed) {
- List, String>> entries = Lists.newArrayList(mergeManifests.entrySet());
- for (Map.Entry, String> entry : entries) {
+ private void cleanUncommittedMerges(Set committed) {
+ // iterate over a copy of entries to avoid concurrent modification
+ List, ManifestFile>> entries =
+ Lists.newArrayList(mergeManifests.entrySet());
+
+ for (Map.Entry, ManifestFile> entry : entries) {
// delete any new merged manifests that aren't in the committed list
- String merged = entry.getValue();
+ ManifestFile merged = entry.getValue();
if (!committed.contains(merged)) {
- deleteFile(merged);
+ deleteFile(merged.path());
// remove the deleted file from the cache
mergeManifests.remove(entry.getKey());
}
}
}
- private void cleanUncommittedFilters(Set committed) {
- List> filterEntries = Lists.newArrayList(filteredManifests.entrySet());
- for (Map.Entry entry : filterEntries) {
+ private void cleanUncommittedFilters(Set committed) {
+ // iterate over a copy of entries to avoid concurrent modification
+ List> filterEntries =
+ Lists.newArrayList(filteredManifests.entrySet());
+
+ for (Map.Entry entry : filterEntries) {
// remove any new filtered manifests that aren't in the committed list
- String manifest = entry.getKey();
- ManifestReader filtered = entry.getValue();
- if (filtered != null) {
- String location = filtered.file().location();
- if (!manifest.equals(location) && !committed.contains(location)) {
- filteredManifests.remove(manifest);
- deleteFile(location);
+ ManifestFile manifest = entry.getKey();
+ ManifestFile filtered = entry.getValue();
+ if (!committed.contains(filtered)) {
+ // only delete if the filtered copy was created
+ if (!manifest.equals(filtered)) {
+ deleteFile(filtered.path());
}
+
+ // remove the entry from the cache
+ filteredManifests.remove(manifest);
}
}
}
@Override
- protected void cleanUncommitted(Set committed) {
+ protected void cleanUncommitted(Set committed) {
+ if (!committed.contains(newManifest)) {
+ deleteFile(newManifest.path());
+ this.newManifest = null;
+ }
cleanUncommittedMerges(committed);
cleanUncommittedFilters(committed);
}
@@ -308,22 +303,20 @@ private boolean nothingToFilter() {
/**
* @return a ManifestReader that is a filtered version of the input manifest.
*/
- private ManifestReader filterManifest(Expression deleteExpression,
+ private ManifestFile filterManifest(Expression deleteExpression,
StrictMetricsEvaluator metricsEvaluator,
- InputFile manifest) {
- ManifestReader cached = filteredManifests.get(manifest.location());
+ ManifestFile manifest) throws IOException {
+ ManifestFile cached = filteredManifests.get(manifest);
if (cached != null) {
return cached;
}
- ManifestReader reader = ManifestReader.read(manifest);
-
if (nothingToFilter()) {
- filteredManifests.put(manifest.location(), reader);
- return reader;
+ filteredManifests.put(manifest, manifest);
+ return manifest;
}
- try {
+ try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
Expression inclusiveExpr = Projections
.inclusive(reader.spec())
.project(deleteExpression);
@@ -344,42 +337,35 @@ private ManifestReader filterManifest(Expression deleteExpression,
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = false;
- Iterator entries = reader.entries().iterator();
- try {
- while (entries.hasNext()) {
- ManifestEntry entry = entries.next();
- DataFile file = entry.file();
- boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
- dropPartitions.contains(partitionWrapper.set(file.partition())));
- if (fileDelete || inclusive.eval(file.partition())) {
- ValidationException.check(
- fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
- "Cannot delete file where some, but not all, rows match filter %s: %s",
- this.deleteExpression, file.path());
-
- hasDeletedFiles = true;
- if (failAnyDelete) {
- throw new DeleteException(writeSpec().partitionToPath(file.partition()));
- }
- break; // as soon as a deleted file is detected, stop scanning
+ for (ManifestEntry entry : reader.entries()) {
+ DataFile file = entry.file();
+ boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
+ dropPartitions.contains(partitionWrapper.set(file.partition())));
+ if (fileDelete || inclusive.eval(file.partition())) {
+ ValidationException.check(
+ fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
+ "Cannot delete file where some, but not all, rows match filter %s: %s",
+ this.deleteExpression, file.path());
+
+ hasDeletedFiles = true;
+ if (failAnyDelete) {
+ throw new DeleteException(writeSpec().partitionToPath(file.partition()));
}
- }
- } finally {
- // the loop may have exited early. ensure the iterator is closed.
- if (entries instanceof Closeable) {
- ((Closeable) entries).close();
+ break; // as soon as a deleted file is detected, stop scanning
}
}
if (!hasDeletedFiles) {
- return reader;
+ filteredManifests.put(manifest, manifest);
+ return manifest;
}
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
Set deletedPaths = Sets.newHashSet();
OutputFile filteredCopy = manifestPath(manifestCount.getAndIncrement());
- try (ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId())) {
+ ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId());
+ try {
for (ManifestEntry entry : reader.entries()) {
DataFile file = entry.file();
boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
@@ -396,7 +382,7 @@ private ManifestReader filterManifest(Expression deleteExpression,
CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
if (deletedPaths.contains(wrapper)) {
LOG.warn("Deleting a duplicate path from manifest {}: {}",
- manifest.location(), wrapper.get());
+ manifest.path(), wrapper.get());
}
deletedPaths.add(wrapper);
@@ -405,96 +391,79 @@ private ManifestReader filterManifest(Expression deleteExpression,
}
}
}
+ } finally {
+ writer.close();
}
- // close the reader now that it is no longer used and will not be returned
- reader.close();
-
// return the filtered manifest as a reader
- ManifestReader filtered = ManifestReader.read(ops.newInputFile(filteredCopy.location()));
+ ManifestFile filtered = writer.toManifestFile();
// update caches
- filteredManifests.put(manifest.location(), filtered);
- filteredManifestToDeletedFiles.put(filteredCopy.location(), deletedPaths);
+ filteredManifests.put(manifest, filtered);
+ filteredManifestToDeletedFiles.put(filtered, deletedPaths);
return filtered;
-
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to filter manifest: %s", reader.file().location());
}
}
@SuppressWarnings("unchecked")
- private Iterable mergeGroup(PartitionSpec groupSpec, List group) {
+ private Iterable mergeGroup(int specId, List group)
+ throws IOException {
// use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack
// from the end so that the manifest that gets under-filled is the first one, which will be
// merged the next time.
- long newFilesSize = newFiles.size() * SIZE_PER_FILE;
- ListPacker packer = new ListPacker<>(manifestTargetSizeBytes, 1);
- List> bins = packer.packEnd(group,
- reader -> reader.file() != null ? reader.file().getLength() : newFilesSize);
+ ListPacker packer = new ListPacker<>(manifestTargetSizeBytes, 1);
+ List> bins = packer.packEnd(group, manifest -> manifest.length());
// process bins in parallel, but put results in the order of the bins into an array to preserve
// the order of manifests and contents. preserving the order helps avoid random deletes when
// data files are eventually aged off.
- List[] binResults = (List[]) Array.newInstance(List.class, bins.size());
+ List[] binResults = (List[])
+ Array.newInstance(List.class, bins.size());
Tasks.range(bins.size())
.stopOnFailure().throwFailureWhenFinished()
.executeWith(getWorkerPool())
.run(index -> {
- List bin = bins.get(index);
- List outputManifests = Lists.newArrayList();
+ List bin = bins.get(index);
+ List outputManifests = Lists.newArrayList();
binResults[index] = outputManifests;
- if (bin.size() == 1 && bin.get(0).file() != null) {
+ if (bin.size() == 1) {
// no need to rewrite
- outputManifests.add(bin.get(0).file().location());
+ outputManifests.add(bin.get(0));
return;
}
- boolean hasInMemoryManifest = false;
- for (ManifestReader reader : bin) {
- if (reader.file() == null) {
- hasInMemoryManifest = true;
- }
- }
-
- // if the bin has an in-memory manifest (the new data) then only merge it if the number of
+ // if the bin has a new manifest (the new data files) then only merge it if the number of
// manifests is above the minimum count. this is applied only to bins with an in-memory
// manifest so that large manifests don't prevent merging older groups.
- if (hasInMemoryManifest && bin.size() < minManifestsCountToMerge) {
- for (ManifestReader reader : bin) {
- if (reader.file() != null) {
- outputManifests.add(reader.file().location());
- } else {
- // write the in-memory manifest
- outputManifests.add(createManifest(groupSpec, Collections.singletonList(reader)));
- }
- }
+ if (bin.contains(newManifest) && bin.size() < minManifestsCountToMerge) {
+ // not enough to merge, add all manifest files to the output list
+ outputManifests.addAll(bin);
} else {
- outputManifests.add(createManifest(groupSpec, bin));
+ // merge the group
+ outputManifests.add(createManifest(specId, bin));
}
- });
+ }, IOException.class);
return Iterables.concat(binResults);
}
- // NOTE: This assumes that any files that are added are in an in-memory manifest.
- private String createManifest(PartitionSpec binSpec, List bin) {
- List key = cacheKey(bin);
+ private ManifestFile createManifest(int specId, List bin) throws IOException {
// if this merge was already rewritten, use the existing file.
- // if the new files are in this merge, the key is based on the number of new files so files
- // added after the last merge will cause a cache miss.
- if (mergeManifests.containsKey(key)) {
- return mergeManifests.get(key);
+ // if the new files are in this merge, then the ManifestFile for the new files has changed and
+ // will be a cache miss.
+ if (mergeManifests.containsKey(bin)) {
+ return mergeManifests.get(bin);
}
OutputFile out = manifestPath(manifestCount.getAndIncrement());
- try (ManifestWriter writer = new ManifestWriter(binSpec, out, snapshotId())) {
+ ManifestWriter writer = new ManifestWriter(ops.current().spec(specId), out, snapshotId());
+ try {
- for (ManifestReader reader : bin) {
- if (reader.file() != null) {
+ for (ManifestFile manifest : bin) {
+ try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
for (ManifestEntry entry : reader.entries()) {
if (entry.status() == Status.DELETED) {
// suppress deletes from previous snapshots. only files deleted by this snapshot
@@ -502,72 +471,49 @@ private String createManifest(PartitionSpec binSpec, List bin) {
if (entry.snapshotId() == snapshotId()) {
writer.add(entry);
}
+ } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) {
+ // adds from this snapshot are still adds, otherwise they should be existing
+ writer.add(entry);
} else {
// add all files from the old manifest as existing files
writer.addExisting(entry);
}
}
- } else {
- // if the files are in an in-memory manifest, then they are new
- writer.addEntries(reader.entries());
}
}
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to write manifest: %s", out);
+ } finally {
+ writer.close();
}
- // update the cache
- mergeManifests.put(key, out.location());
+ ManifestFile manifest = writer.toManifestFile();
- return out.location();
- }
+ // update the cache
+ mergeManifests.put(bin, manifest);
- private ManifestReader newFilesAsManifest() {
- long id = snapshotId();
- ManifestEntry reused = new ManifestEntry(spec.partitionType());
- return ManifestReader.inMemory(spec,
- transform(newFiles, file -> {
- reused.wrapAppend(id, file);
- return reused;
- }));
+ return manifest;
}
- private List cacheKey(List group) {
- List key = Lists.newArrayList();
-
- for (ManifestReader reader : group) {
- if (reader.file() != null) {
- key.add(reader.file().location());
- } else {
- // if the file is null, this is an in-memory reader
- // use the size to avoid collisions if retries have added files
- key.add("append-" + newFiles.size() + "-files");
- }
+ private ManifestFile newFilesAsManifest() throws IOException {
+ if (hasNewFiles && newManifest != null) {
+ deleteFile(newManifest.path());
+ newManifest = null;
}
- return key;
- }
+ if (newManifest == null) {
+ OutputFile out = manifestPath(manifestCount.getAndIncrement());
- /**
- * Helper method to group manifests by compatible partition spec.
- *
- * When a match is found, this will replace the current spec for the group with the query spec.
- * This is to produce manifests with the latest compatible spec.
- *
- * @param specs a list of partition specs, corresponding to the groups of readers
- * @param spec spec to be matched to a group
- * @return group of readers files for this spec can be merged into
- */
- private static int findMatch(List specs,
- PartitionSpec spec) {
- // loop from last to first because later specs are most likely to match
- for (int i = specs.size() - 1; i >= 0; i -= 1) {
- if (specs.get(i).compatibleWith(spec)) {
- return i;
+ ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+ try {
+ writer.addAll(newFiles);
+ } finally {
+ writer.close();
}
+
+ this.newManifest = writer.toManifestFile();
+ this.hasNewFiles = false;
}
- return -1;
+ return newManifest;
}
}
diff --git a/core/src/main/java/com/netflix/iceberg/OverwriteData.java b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
index 404b4407fda9..3ebb7253a1c6 100644
--- a/core/src/main/java/com/netflix/iceberg/OverwriteData.java
+++ b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
@@ -52,7 +52,7 @@ public OverwriteFiles validateAddedFiles() {
}
@Override
- public List apply(TableMetadata base) {
+ public List apply(TableMetadata base) {
if (validateAddedFiles) {
PartitionSpec spec = writeSpec();
Expression rowFilter = rowFilter();
diff --git a/core/src/main/java/com/netflix/iceberg/PartitionSummary.java b/core/src/main/java/com/netflix/iceberg/PartitionSummary.java
new file mode 100644
index 000000000000..52a2b4a66de2
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/PartitionSummary.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.Lists;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.types.Comparators;
+import com.netflix.iceberg.types.Conversions;
+import com.netflix.iceberg.types.Type;
+import com.netflix.iceberg.types.Types;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+class PartitionSummary {
+ private final PartitionFieldStats>[] fields;
+ private final Class>[] javaClasses;
+
+ PartitionSummary(PartitionSpec spec) {
+ this.javaClasses = spec.javaClasses();
+ this.fields = new PartitionFieldStats[javaClasses.length];
+ List partitionFields = spec.partitionType().fields();
+ for (int i = 0; i < fields.length; i += 1) {
+ this.fields[i] = new PartitionFieldStats<>(partitionFields.get(i).type());
+ }
+ }
+
+ List summaries() {
+ return Lists.transform(Arrays.asList(fields), PartitionFieldStats::toSummary);
+ }
+
+ public void update(StructLike partitionKey) {
+ updateFields(partitionKey);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void updateFields(StructLike key) {
+ for (int i = 0; i < javaClasses.length; i += 1) {
+ PartitionFieldStats stats = (PartitionFieldStats) fields[i];
+ Class javaClass = (Class) javaClasses[i];
+ stats.update(key.get(i, javaClass));
+ }
+ }
+
+ private static class PartitionFieldStats {
+ private final Type type;
+ private final Comparator comparator;
+
+ private boolean containsNull = false;
+ private T min = null;
+ private T max = null;
+
+ private PartitionFieldStats(Type type) {
+ this.type = type;
+ this.comparator = Comparators.forType(type.asPrimitiveType());
+ }
+
+ public PartitionFieldSummary toSummary() {
+ return new GenericPartitionFieldSummary(containsNull,
+ min != null ? Conversions.toByteBuffer(type, min) : null,
+ max != null ? Conversions.toByteBuffer(type, max) : null);
+ }
+
+ void update(T value) {
+ if (value == null) {
+ this.containsNull = true;
+ } else if (min == null) {
+ this.min = value;
+ this.max = value;
+ } else {
+ if (comparator.compare(value, min) < 0) {
+ this.min = value;
+ }
+ if (comparator.compare(max, value) < 0) {
+ this.max = value;
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
index b09767407f27..8f473b36d169 100644
--- a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
@@ -130,22 +130,22 @@ public void commit() {
TableMetadata current = ops.refresh();
Set currentIds = Sets.newHashSet();
- Set currentManifests = Sets.newHashSet();
+ Set currentManifests = Sets.newHashSet();
for (Snapshot snapshot : current.snapshots()) {
currentIds.add(snapshot.snapshotId());
currentManifests.addAll(snapshot.manifests());
}
- Set allManifests = Sets.newHashSet(currentManifests);
+ Set allManifests = Sets.newHashSet(currentManifests);
Set manifestsToDelete = Sets.newHashSet();
for (Snapshot snapshot : base.snapshots()) {
long snapshotId = snapshot.snapshotId();
if (!currentIds.contains(snapshotId)) {
// the snapshot was removed, find any manifests that are no longer needed
LOG.info("Removing snapshot: {}", snapshot);
- for (String manifest : snapshot.manifests()) {
+ for (ManifestFile manifest : snapshot.manifests()) {
if (!currentManifests.contains(manifest)) {
- manifestsToDelete.add(manifest);
+ manifestsToDelete.add(manifest.path());
allManifests.add(manifest);
}
}
@@ -161,7 +161,7 @@ public void commit() {
).run(manifest -> {
// even if the manifest is still used, it may contain files that can be deleted
// TODO: eliminate manifests with no deletes without scanning
- try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
+ try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
for (ManifestEntry entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
if (entry.status() == ManifestEntry.Status.DELETED &&
@@ -171,7 +171,7 @@ public void commit() {
}
}
} catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to read manifest: " + manifest);
+ throw new RuntimeIOException(e, "Failed to read manifest file: " + manifest.path());
}
});
diff --git a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
index 9d0db6cedf4e..4fb6aa8abd01 100644
--- a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
+++ b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
@@ -42,7 +42,7 @@ public ReplacePartitions validateAppendOnly() {
}
@Override
- public List apply(TableMetadata base) {
+ public List apply(TableMetadata base) {
if (writeSpec().fields().size() <= 0) {
// replace all data in an unpartitioned table
deleteByRowFilter(Expressions.alwaysTrue());
diff --git a/core/src/main/java/com/netflix/iceberg/ScanSummary.java b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
index b19ab38d7b25..731578680d50 100644
--- a/core/src/main/java/com/netflix/iceberg/ScanSummary.java
+++ b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
@@ -22,6 +22,7 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -90,11 +91,21 @@ private void addTimestampFilter(UnboundPredicate filter) {
timeFilters.add(filter);
}
+ public Builder after(String timestamp) {
+ Literal tsLiteral = Literal.of(timestamp).to(Types.TimestampType.withoutZone());
+ return after(tsLiteral.value() / 1000);
+ }
+
public Builder after(long timestampMillis) {
addTimestampFilter(Expressions.greaterThanOrEqual("timestamp_ms", timestampMillis));
return this;
}
+ public Builder before(String timestamp) {
+ Literal tsLiteral = Literal.of(timestamp).to(Types.TimestampType.withoutZone());
+ return before(tsLiteral.value() / 1000);
+ }
+
public Builder before(long timestampMillis) {
addTimestampFilter(Expressions.lessThanOrEqual("timestamp_ms", timestampMillis));
return this;
@@ -145,29 +156,45 @@ public Map build() {
removeTimeFilters(filters, Expressions.rewriteNot(scan.filter()));
Expression rowFilter = joinFilters(filters);
- long minTimestamp = Long.MIN_VALUE;
- long maxTimestamp = Long.MAX_VALUE;
+ Iterable manifests = table.currentSnapshot().manifests();
+
boolean filterByTimestamp = !timeFilters.isEmpty();
+ Set snapshotsInTimeRange = Sets.newHashSet();
if (filterByTimestamp) {
Pair range = timestampRange(timeFilters);
- minTimestamp = range.first();
- maxTimestamp = range.second();
+ long minTimestamp = range.first();
+ long maxTimestamp = range.second();
+
+ for (Map.Entry entry : snapshotTimestamps.entrySet()) {
+ long snapshotId = entry.getKey();
+ long timestamp = entry.getValue();
+ if (timestamp >= minTimestamp && timestamp <= maxTimestamp) {
+ snapshotsInTimeRange.add(snapshotId);
+ }
+ }
+
+ // when filtering by dateCreated or lastUpdated timestamp, this matches the set of files
+ // that were added in the time range. files are added in new snapshots, so to get the new
+ // files, this only needs to scan new manifests in the set of snapshots that match the
+ // filter. ManifestFile.snapshotId() returns the snapshot when the manifest was added, so
+ // the only manifests that need to be scanned are those with snapshotId() in the timestamp
+ // range, or those that don't have a snapshot ID.
+ manifests = Iterables.filter(manifests, manifest ->
+ manifest.snapshotId() == null || snapshotsInTimeRange.contains(manifest.snapshotId()));
}
- try (CloseableIterable entries =
- new ManifestGroup(ops, table.currentSnapshot().manifests())
- .filterData(rowFilter)
- .ignoreDeleted()
- .select(SCAN_SUMMARY_COLUMNS)
- .entries()) {
+ try (CloseableIterable entries = new ManifestGroup(ops, manifests)
+ .filterData(rowFilter)
+ .ignoreDeleted()
+ .select(SCAN_SUMMARY_COLUMNS)
+ .entries()) {
PartitionSpec spec = table.spec();
for (ManifestEntry entry : entries) {
Long timestamp = snapshotTimestamps.get(entry.snapshotId());
// if filtering, skip timestamps that are outside the range
- if (filterByTimestamp &&
- (timestamp == null || timestamp < minTimestamp || timestamp > maxTimestamp)) {
+ if (filterByTimestamp && !snapshotsInTimeRange.contains(entry.snapshotId())) {
continue;
}
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
index cf04becfbbf2..a5ce08c527ba 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
@@ -22,8 +22,12 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.util.JsonUtil;
+import com.netflix.iceberg.util.Tasks;
+import com.netflix.iceberg.util.ThreadPools;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
@@ -34,19 +38,30 @@ public class SnapshotParser {
private static final String PARENT_SNAPSHOT_ID = "parent-snapshot-id";
private static final String TIMESTAMP_MS = "timestamp-ms";
private static final String MANIFESTS = "manifests";
+ private static final String MANIFEST_LIST = "manifest-list";
- static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException {
+ static void toJson(Snapshot snapshot, JsonGenerator generator)
+ throws IOException {
generator.writeStartObject();
generator.writeNumberField(SNAPSHOT_ID, snapshot.snapshotId());
if (snapshot.parentId() != null) {
generator.writeNumberField(PARENT_SNAPSHOT_ID, snapshot.parentId());
}
generator.writeNumberField(TIMESTAMP_MS, snapshot.timestampMillis());
- generator.writeArrayFieldStart(MANIFESTS);
- for (String file : snapshot.manifests()) {
- generator.writeString(file);
+
+ String manifestList = snapshot.manifestListLocation();
+ if (manifestList != null) {
+ // write just the location. manifests should not be embedded in JSON along with a list
+ generator.writeStringField(MANIFEST_LIST, manifestList);
+ } else {
+ // embed the manifest list in the JSON
+ generator.writeArrayFieldStart(MANIFESTS);
+ for (ManifestFile file : snapshot.manifests()) {
+ generator.writeString(file.path());
+ }
+ generator.writeEndArray();
}
- generator.writeEndArray();
+
generator.writeEndObject();
}
@@ -73,9 +88,19 @@ static Snapshot fromJson(TableOperations ops, JsonNode node) {
parentId = JsonUtil.getLong(PARENT_SNAPSHOT_ID, node);
}
long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node);
- List manifests = JsonUtil.getStringList(MANIFESTS, node);
- return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
+ if (node.has(MANIFEST_LIST)) {
+ // the manifest list is stored in a manifest list file
+ String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
+ return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.newInputFile(manifestList));
+
+ } else {
+ // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
+ // loaded lazily, if it is needed
+ List manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
+ location -> new GenericManifestFile(ops.newInputFile(location), 0));
+ return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
+ }
}
public static Snapshot fromJson(TableOperations ops, String json) {
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
index 8fe0d81015a9..54c0483d2fa9 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
@@ -19,13 +19,19 @@
package com.netflix.iceberg;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Sets;
import com.netflix.iceberg.exceptions.CommitFailedException;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.OutputFile;
import com.netflix.iceberg.util.Exceptions;
import com.netflix.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -39,10 +45,28 @@
import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import static com.netflix.iceberg.TableProperties.MANIFEST_LISTS_ENABLED;
+import static com.netflix.iceberg.TableProperties.MANIFEST_LISTS_ENABLED_DEFAULT;
+import static com.netflix.iceberg.util.ThreadPools.getWorkerPool;
abstract class SnapshotUpdate implements PendingUpdate {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotUpdate.class);
- static final Set EMPTY_SET = Sets.newHashSet();
+ static final Set EMPTY_SET = Sets.newHashSet();
+
+ /**
+ * Cache used to enrich ManifestFile instances that are written to a ManifestListWriter.
+ */
+ private final LoadingCache manifestsWithMetadata = CacheBuilder
+ .newBuilder()
+ .build(new CacheLoader() {
+ @Override
+ public ManifestFile load(ManifestFile file) {
+ if (file.snapshotId() != null) {
+ return file;
+ }
+ return addMetadata(ops, file);
+ }
+ });
private final TableOperations ops;
private final String commitUUID = UUID.randomUUID().toString();
@@ -60,7 +84,7 @@ protected SnapshotUpdate(TableOperations ops) {
* @param base the base table metadata to apply changes to
* @return a manifest list for the new snapshot.
*/
- protected abstract List apply(TableMetadata base);
+ protected abstract List apply(TableMetadata base);
/**
* Clean up any uncommitted manifests that were created.
@@ -72,16 +96,48 @@ protected SnapshotUpdate(TableOperations ops) {
*
* @param committed a set of manifest paths that were actually committed
*/
- protected abstract void cleanUncommitted(Set committed);
+ protected abstract void cleanUncommitted(Set committed);
@Override
public Snapshot apply() {
this.base = ops.refresh();
- List manifests = apply(base);
- Long currentSnapshotId = base.currentSnapshot() != null ?
+ Long parentSnapshotId = base.currentSnapshot() != null ?
base.currentSnapshot().snapshotId() : null;
- return new BaseSnapshot(ops,
- snapshotId(), currentSnapshotId, System.currentTimeMillis(), manifests);
+
+ List manifests = apply(base);
+
+ if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
+ OutputFile manifestList = manifestListPath();
+
+ try (ManifestListWriter writer = new ManifestListWriter(
+ manifestListPath(), snapshotId(), parentSnapshotId)) {
+ ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
+
+ Tasks.range(manifestFiles.length)
+ .stopOnFailure().throwFailureWhenFinished()
+ .retry(4).exponentialBackoff(
+ base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */ )
+ .executeWith(getWorkerPool())
+ .run(index ->
+ manifestFiles[index] = manifestsWithMetadata.getUnchecked(manifests.get(index)));
+
+ writer.addAll(Arrays.asList(manifestFiles));
+
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to write manifest list file");
+ }
+
+ return new BaseSnapshot(ops,
+ snapshotId(), parentSnapshotId, System.currentTimeMillis(),
+ ops.newInputFile(manifestList.location()));
+
+ } else {
+ return new BaseSnapshot(ops,
+ snapshotId(), parentSnapshotId, System.currentTimeMillis(), manifests);
+ }
}
@Override
@@ -123,7 +179,7 @@ public void commit() {
}
} catch (RuntimeException e) {
- LOG.info("Failed to load committed table metadata, skipping manifest clean-up");
+ LOG.info("Failed to load committed table metadata, skipping manifest clean-up", e);
}
}
@@ -135,6 +191,11 @@ protected void deleteFile(String path) {
ops.deleteFile(path);
}
+ protected OutputFile manifestListPath() {
+ return ops.newMetadataFile(FileFormat.AVRO.addExtension(
+ String.format("snap-%d-%s", snapshotId(), commitUUID)));
+ }
+
protected OutputFile manifestPath(int i) {
return ops.newMetadataFile(FileFormat.AVRO.addExtension(commitUUID + "-m" + i));
}
@@ -145,4 +206,52 @@ protected long snapshotId() {
}
return snapshotId;
}
+
+ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
+ try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+ PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
+ int addedFiles = 0;
+ int existingFiles = 0;
+ int deletedFiles = 0;
+
+ Long snapshotId = null;
+ long maxSnapshotId = Long.MIN_VALUE;
+ for (ManifestEntry entry : reader.entries()) {
+ if (entry.snapshotId() > maxSnapshotId) {
+ maxSnapshotId = entry.snapshotId();
+ }
+
+ switch (entry.status()) {
+ case ADDED:
+ addedFiles += 1;
+ if (snapshotId == null) {
+ snapshotId = entry.snapshotId();
+ }
+ break;
+ case EXISTING:
+ existingFiles += 1;
+ break;
+ case DELETED:
+ deletedFiles += 1;
+ if (snapshotId == null) {
+ snapshotId = entry.snapshotId();
+ }
+ break;
+ }
+
+ stats.update(entry.file().partition());
+ }
+
+ if (snapshotId == null) {
+ // if no files were added or deleted, use the largest snapshot ID in the manifest
+ snapshotId = maxSnapshotId;
+ }
+
+ return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(),
+ snapshotId, addedFiles, existingFiles, deletedFiles, stats.summaries());
+
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path());
+ }
+ }
}
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
index 05c3392c1f2c..c949f13a74c1 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
@@ -221,6 +221,14 @@ public Map properties() {
return properties;
}
+ public boolean propertyAsBoolean(String property, boolean defaultValue) {
+ String value = properties.get(property);
+ if (value != null) {
+ return Boolean.parseBoolean(properties.get(property));
+ }
+ return defaultValue;
+ }
+
public int propertyAsInt(String property, int defaultValue) {
String value = properties.get(property);
if (value != null) {
diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java
index 6ca09a5475a4..e522f849f301 100644
--- a/core/src/main/java/com/netflix/iceberg/TableProperties.java
+++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java
@@ -66,4 +66,7 @@ public class TableProperties {
public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;
public static final String OBJECT_STORE_PATH = "write.object-storage.path";
+
+ public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
+ public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = false;
}
diff --git a/core/src/main/java/com/netflix/iceberg/avro/Avro.java b/core/src/main/java/com/netflix/iceberg/avro/Avro.java
index d58bfbd59b25..b08b5ff1b73d 100644
--- a/core/src/main/java/com/netflix/iceberg/avro/Avro.java
+++ b/core/src/main/java/com/netflix/iceberg/avro/Avro.java
@@ -22,7 +22,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.netflix.iceberg.SchemaParser;
-import com.netflix.iceberg.io.CloseableIterable;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import org.apache.avro.Conversions;
@@ -118,6 +117,11 @@ public WriteBuilder meta(String property, String value) {
return this;
}
+ public WriteBuilder meta(Map properties) {
+ metadata.putAll(properties);
+ return this;
+ }
+
private CodecFactory codec() {
String codec = config.getOrDefault(AVRO_COMPRESSION, AVRO_COMPRESSION_DEFAULT);
try {
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
index 9edea0d2d50d..2cf23ce22331 100644
--- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
@@ -21,6 +21,7 @@
import com.netflix.iceberg.exceptions.AlreadyExistsException;
import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import com.netflix.iceberg.io.PositionOutputStream;
import org.apache.hadoop.conf.Configuration;
@@ -80,6 +81,11 @@ public String location() {
return path.toString();
}
+ @Override
+ public InputFile toInputFile() {
+ return HadoopInputFile.fromPath(path, conf);
+ }
+
@Override
public String toString() {
return location();
diff --git a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
new file mode 100644
index 000000000000..27a01fc49f3c
--- /dev/null
+++ b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.io.OutputFile;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+import static com.netflix.iceberg.Files.localInput;
+
+class LocalTableOperations implements TableOperations {
+ private final TemporaryFolder temp;
+
+ LocalTableOperations(TemporaryFolder temp) {
+ this.temp = temp;
+ }
+
+ @Override
+ public TableMetadata current() {
+ throw new UnsupportedOperationException("Not implemented for tests");
+ }
+
+ @Override
+ public TableMetadata refresh() {
+ throw new UnsupportedOperationException("Not implemented for tests");
+ }
+
+ @Override
+ public void commit(TableMetadata base, TableMetadata metadata) {
+ throw new UnsupportedOperationException("Not implemented for tests");
+ }
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return localInput(path);
+ }
+
+ @Override
+ public OutputFile newMetadataFile(String filename) {
+ try {
+ File metadataFile = temp.newFile(filename);
+ metadataFile.delete();
+ metadataFile.deleteOnExit();
+ return Files.localOutput(metadataFile);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ new File(path).delete();
+ }
+
+ @Override
+ public long newSnapshotId() {
+ throw new UnsupportedOperationException("Not implemented for tests");
+ }
+}
diff --git a/core/src/test/java/com/netflix/iceberg/TableTestBase.java b/core/src/test/java/com/netflix/iceberg/TableTestBase.java
index cafb440b449f..c723daa14568 100644
--- a/core/src/test/java/com/netflix/iceberg/TableTestBase.java
+++ b/core/src/test/java/com/netflix/iceberg/TableTestBase.java
@@ -120,23 +120,23 @@ TableMetadata readMetadata() {
}
void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) {
- List oldManifests = old != null ? old.manifests() : ImmutableList.of();
+ List oldManifests = old != null ? old.manifests() : ImmutableList.of();
// copy the manifests to a modifiable list and remove the existing manifests
- List newManifests = Lists.newArrayList(snap.manifests());
- for (String oldManifest : oldManifests) {
+ List newManifests = Lists.newArrayList(snap.manifests());
+ for (ManifestFile oldManifest : oldManifests) {
Assert.assertTrue("New snapshot should contain old manifests",
newManifests.remove(oldManifest));
}
Assert.assertEquals("Should create 1 new manifest and reuse old manifests",
1, newManifests.size());
- String manifest = newManifests.get(0);
+ ManifestFile manifest = newManifests.get(0);
long id = snap.snapshotId();
Iterator newPaths = paths(newFiles).iterator();
- for (ManifestEntry entry : ManifestReader.read(localInput(manifest)).entries()) {
+ for (ManifestEntry entry : ManifestReader.read(localInput(manifest.path())).entries()) {
DataFile file = entry.file();
Assert.assertEquals("Path should match expected", newPaths.next(), file.path().toString());
Assert.assertEquals("File's snapshot ID should match", id, entry.snapshotId());
@@ -153,9 +153,15 @@ List paths(DataFile... dataFiles) {
return paths;
}
+ static void validateManifest(ManifestFile manifest,
+ Iterator ids,
+ Iterator expectedFiles) {
+ validateManifest(manifest.path(), ids, expectedFiles);
+ }
+
static void validateManifest(String manifest,
- Iterator ids,
- Iterator expectedFiles) {
+ Iterator ids,
+ Iterator expectedFiles) {
for (ManifestEntry entry : ManifestReader.read(localInput(manifest)).entries()) {
DataFile file = entry.file();
DataFile expected = expectedFiles.next();
@@ -168,6 +174,13 @@ static void validateManifest(String manifest,
Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext());
}
+ static void validateManifestEntries(ManifestFile manifest,
+ Iterator ids,
+ Iterator expectedFiles,
+ Iterator expectedStatuses) {
+ validateManifestEntries(manifest.path(), ids, expectedFiles, expectedStatuses);
+ }
+
static void validateManifestEntries(String manifest,
Iterator ids,
Iterator expectedFiles,
@@ -199,7 +212,7 @@ static Iterator files(DataFile... files) {
return Iterators.forArray(files);
}
- static Iterator files(String manifest) {
- return ManifestReader.read(localInput(manifest)).iterator();
+ static Iterator files(ManifestFile manifest) {
+ return ManifestReader.read(localInput(manifest.path())).iterator();
}
}
diff --git a/core/src/test/java/com/netflix/iceberg/TestFastAppend.java b/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
index 9cac989f1235..4d9e174df521 100644
--- a/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
+++ b/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
@@ -54,7 +54,7 @@ public void testNonEmptyTableAppend() {
TableMetadata base = readMetadata();
Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
- List