From e04fa66ef5db635056d610307e0259fba03cef44 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sat, 22 Aug 2020 14:30:37 -0700 Subject: [PATCH 1/4] Add partition summaries in SnapshotSummary builder. --- .../org/apache/iceberg/SnapshotSummary.java | 254 +++++++++++++----- 1 file changed, 191 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index 4b7e48015cb7..c81fdca86b07 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -21,9 +21,10 @@ import java.util.Map; import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Joiner.MapJoiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; public class SnapshotSummary { public static final String ADDED_FILES_PROP = "added-data-files"; @@ -35,6 +36,8 @@ public class SnapshotSummary { public static final String ADDED_RECORDS_PROP = "added-records"; public static final String DELETED_RECORDS_PROP = "deleted-records"; public static final String TOTAL_RECORDS_PROP = "total-records"; + public static final String ADDED_FILE_SIZE_PROP = "added-files-size"; + public static final String REMOVED_FILE_SIZE_PROP = "removed-files-size"; public static final String ADDED_POS_DELETES_PROP = "added-position-deletes"; public static final String REMOVED_POS_DELETES_PROP = "removed-position-deletes"; public static final String TOTAL_POS_DELETES_PROP = "total-position-deletes"; @@ -43,12 +46,16 @@ public class SnapshotSummary { public static final String TOTAL_EQ_DELETES_PROP = "total-equality-deletes"; public static final String DELETED_DUPLICATE_FILES = "deleted-duplicate-files"; public static final String CHANGED_PARTITION_COUNT_PROP = "changed-partition-count"; + public static final String CHANGED_PARTITION_PREFIX = "partitions."; + public static final String PARTITION_SUMMARY_PROP = "partition-summaries-included"; public static final String STAGED_WAP_ID_PROP = "wap.id"; public static final String PUBLISHED_WAP_ID_PROP = "published-wap-id"; public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id"; public static final String REPLACE_PARTITIONS_PROP = "replace-partitions"; public static final String EXTRA_METADATA_PREFIX = "snapshot-property."; + public static final MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("="); + private SnapshotSummary() { } @@ -58,27 +65,17 @@ public static Builder builder() { public static class Builder { // commit summary tracking - private Set changedPartitions = Sets.newHashSet(); - private long addedFiles = 0L; - private long deletedFiles = 0L; - private long addedDeleteFiles = 0L; - private long removedDeleteFiles = 0L; + private final Map properties = Maps.newHashMap(); + private final Map partitionMetrics = Maps.newHashMap(); + private final UpdateMetrics metrics = new UpdateMetrics(); private long deletedDuplicateFiles = 0L; - private long addedRecords = 0L; - private long deletedRecords = 0L; - private long addedPosDeletes = 0L; - private long removedPosDeletes = 0L; - private long addedEqDeletes = 0L; - private long removedEqDeletes = 0L; - private Map properties = Maps.newHashMap(); + private boolean trustPartitionMetrics = true; public void clear() { - changedPartitions.clear(); - this.addedFiles = 0L; - this.deletedFiles = 0L; + partitionMetrics.clear(); + metrics.clear(); this.deletedDuplicateFiles = 0L; - this.addedRecords = 0L; - this.deletedRecords = 0L; + this.trustPartitionMetrics = true; } public void incrementDuplicateDeletes() { @@ -90,19 +87,13 @@ public void incrementDuplicateDeletes(int increment) { } public void addedFile(PartitionSpec spec, DataFile file) { - changedPartitions.add(spec.partitionToPath(file.partition())); - this.addedFiles += 1; - this.addedRecords += file.recordCount(); + metrics.addedFile(file); + updatePartitions(spec, file, true); } public void addedFile(PartitionSpec spec, DeleteFile file) { - changedPartitions.add(spec.partitionToPath(file.partition())); - this.addedDeleteFiles += 1; - if (file.content() == FileContent.POSITION_DELETES) { - this.addedPosDeletes += file.recordCount(); - } else { - this.addedEqDeletes += file.recordCount(); - } + metrics.addedFile(file); + updatePartitions(spec, file, true); } public void deletedFile(PartitionSpec spec, ContentFile file) { @@ -116,43 +107,53 @@ public void deletedFile(PartitionSpec spec, ContentFile file) { } public void deletedFile(PartitionSpec spec, DataFile file) { - changedPartitions.add(spec.partitionToPath(file.partition())); - this.deletedFiles += 1; - this.deletedRecords += file.recordCount(); + metrics.removedFile(file); + updatePartitions(spec, file, false); } public void deletedFile(PartitionSpec spec, DeleteFile file) { - changedPartitions.add(spec.partitionToPath(file.partition())); - this.removedDeleteFiles += 1; - if (file.content() == FileContent.POSITION_DELETES) { - this.removedPosDeletes += file.recordCount(); - } else { - this.removedEqDeletes += file.recordCount(); - } + metrics.removedFile(file); + updatePartitions(spec, file, false); } public void addedManifest(ManifestFile manifest) { - this.addedFiles += manifest.addedFilesCount(); - this.addedRecords += manifest.addedRowsCount(); - } - - public void deletedManifest(ManifestFile manifest) { - this.deletedFiles += manifest.deletedFilesCount(); - this.deletedRecords += manifest.deletedRowsCount(); + this.trustPartitionMetrics = false; + partitionMetrics.clear(); + metrics.addedManifest(manifest); } public void set(String property, String value) { properties.put(property, value); } + private void updatePartitions(PartitionSpec spec, ContentFile file, boolean isAddition) { + if (trustPartitionMetrics) { + UpdateMetrics partMetrics = partitionMetrics.computeIfAbsent( + spec.partitionToPath(file.partition()), + key -> new UpdateMetrics()); + + if (isAddition) { + partMetrics.addedFile(file); + } else { + partMetrics.removedFile(file); + } + } + } + public void merge(SnapshotSummary.Builder builder) { - this.changedPartitions.addAll(builder.changedPartitions); - this.addedFiles += builder.addedFiles; - this.deletedFiles += builder.deletedFiles; + properties.putAll(builder.properties); + metrics.merge(builder.metrics); + + this.trustPartitionMetrics = trustPartitionMetrics && builder.trustPartitionMetrics; + if (trustPartitionMetrics) { + for (Map.Entry entry : builder.partitionMetrics.entrySet()) { + partitionMetrics.computeIfAbsent(entry.getKey(), key -> new UpdateMetrics()).merge(entry.getValue()); + } + } else { + partitionMetrics.clear(); + } + this.deletedDuplicateFiles += builder.deletedDuplicateFiles; - this.addedRecords += builder.addedRecords; - this.deletedRecords += builder.deletedRecords; - this.properties.putAll(builder.properties); } public Map build() { @@ -161,27 +162,154 @@ public Map build() { // copy custom summary properties builder.putAll(properties); + metrics.addTo(builder); + setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES, deletedDuplicateFiles); + Set changedPartitions = partitionMetrics.keySet(); + setIf(trustPartitionMetrics, builder, CHANGED_PARTITION_COUNT_PROP, changedPartitions.size()); + + if (trustPartitionMetrics && changedPartitions.size() < 100) { + setIf(true, builder, PARTITION_SUMMARY_PROP, "true"); + for (String key : changedPartitions) { + setIf(key != null, builder, CHANGED_PARTITION_PREFIX + key, partitionSummary(partitionMetrics.get(key))); + } + } + + return builder.build(); + } + + private static String partitionSummary(UpdateMetrics metrics) { + ImmutableMap.Builder partBuilder = ImmutableMap.builder(); + metrics.addTo(partBuilder); + return MAP_JOINER.join(partBuilder.build()); + } + } + + private static class UpdateMetrics { + private long addedSize = 0L; + private long removedSize = 0L; + private int addedFiles = 0; + private int removedFiles = 0; + private int addedDeleteFiles = 0; + private int removedDeleteFiles = 0; + private long addedRecords = 0L; + private long deletedRecords = 0L; + private long addedPosDeletes = 0L; + private long removedPosDeletes = 0L; + private long addedEqDeletes = 0L; + private long removedEqDeletes = 0L; + private boolean trustSizeAndDeleteCounts = true; + + void clear() { + this.addedSize = 0L; + this.removedSize = 0L; + this.addedFiles = 0; + this.removedFiles = 0; + this.addedDeleteFiles = 0; + this.removedDeleteFiles = 0; + this.addedRecords = 0L; + this.deletedRecords = 0L; + this.addedPosDeletes = 0L; + this.removedPosDeletes = 0L; + this.addedEqDeletes = 0L; + this.removedEqDeletes = 0L; + this.trustSizeAndDeleteCounts = true; + } + + void addTo(ImmutableMap.Builder builder) { setIf(addedFiles > 0, builder, ADDED_FILES_PROP, addedFiles); - setIf(deletedFiles > 0, builder, DELETED_FILES_PROP, deletedFiles); + setIf(removedFiles > 0, builder, DELETED_FILES_PROP, removedFiles); setIf(addedDeleteFiles > 0, builder, ADDED_DELETE_FILES_PROP, addedDeleteFiles); setIf(removedDeleteFiles > 0, builder, REMOVED_DELETE_FILES_PROP, removedDeleteFiles); - setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES, deletedDuplicateFiles); setIf(addedRecords > 0, builder, ADDED_RECORDS_PROP, addedRecords); setIf(deletedRecords > 0, builder, DELETED_RECORDS_PROP, deletedRecords); - setIf(addedPosDeletes > 0, builder, ADDED_POS_DELETES_PROP, addedPosDeletes); - setIf(removedPosDeletes > 0, builder, REMOVED_POS_DELETES_PROP, removedPosDeletes); - setIf(addedEqDeletes > 0, builder, ADDED_EQ_DELETES_PROP, addedEqDeletes); - setIf(removedEqDeletes > 0, builder, REMOVED_EQ_DELETES_PROP, removedEqDeletes); - setIf(true, builder, CHANGED_PARTITION_COUNT_PROP, changedPartitions.size()); - return builder.build(); + if (trustSizeAndDeleteCounts) { + setIf(addedSize > 0, builder, ADDED_FILE_SIZE_PROP, addedSize); + setIf(removedSize > 0, builder, REMOVED_FILE_SIZE_PROP, removedSize); + setIf(addedPosDeletes > 0, builder, ADDED_POS_DELETES_PROP, addedPosDeletes); + setIf(removedPosDeletes > 0, builder, REMOVED_POS_DELETES_PROP, removedPosDeletes); + setIf(addedEqDeletes > 0, builder, ADDED_EQ_DELETES_PROP, addedEqDeletes); + setIf(removedEqDeletes > 0, builder, REMOVED_EQ_DELETES_PROP, removedEqDeletes); + } + } + + void addedFile(ContentFile file) { + this.addedSize += file.fileSizeInBytes(); + switch (file.content()) { + case DATA: + this.addedFiles += 1; + this.addedRecords += file.recordCount(); + break; + case POSITION_DELETES: + this.addedDeleteFiles += 1; + this.addedPosDeletes += file.recordCount(); + break; + case EQUALITY_DELETES: + this.addedDeleteFiles += 1; + this.addedEqDeletes += file.recordCount(); + break; + default: + throw new UnsupportedOperationException("Unsupported file content type: " + file.content()); + } + } + + void removedFile(ContentFile file) { + this.removedSize += file.fileSizeInBytes(); + switch (file.content()) { + case DATA: + this.removedFiles += 1; + this.deletedRecords += file.recordCount(); + break; + case POSITION_DELETES: + this.removedDeleteFiles += 1; + this.removedPosDeletes += file.recordCount(); + break; + case EQUALITY_DELETES: + this.removedDeleteFiles += 1; + this.removedEqDeletes += file.recordCount(); + break; + default: + throw new UnsupportedOperationException("Unsupported file content type: " + file.content()); + } } - private static void setIf(boolean expression, ImmutableMap.Builder builder, - String property, Object value) { - if (expression) { - builder.put(property, String.valueOf(value)); + void addedManifest(ManifestFile manifest) { + switch (manifest.content()) { + case DATA: + this.addedFiles += manifest.addedFilesCount(); + this.addedRecords += manifest.addedRowsCount(); + this.removedFiles += manifest.deletedFilesCount(); + this.deletedRecords += manifest.deletedRowsCount(); + break; + case DELETES: + this.addedDeleteFiles += manifest.addedFilesCount(); + this.removedDeleteFiles += manifest.deletedFilesCount(); + this.trustSizeAndDeleteCounts = false; + break; } } + + void merge(UpdateMetrics other) { + this.addedFiles += other.addedFiles; + this.removedFiles += other.removedFiles; + this.addedDeleteFiles += other.addedDeleteFiles; + this.removedDeleteFiles += other.removedDeleteFiles; + this.addedSize += other.addedSize; + this.removedSize += other.removedSize; + this.addedRecords += other.addedRecords; + this.deletedRecords += other.deletedRecords; + this.addedPosDeletes += other.addedPosDeletes; + this.removedPosDeletes += other.removedPosDeletes; + this.addedEqDeletes += other.addedEqDeletes; + this.removedEqDeletes += other.removedEqDeletes; + this.trustSizeAndDeleteCounts = trustSizeAndDeleteCounts && other.trustSizeAndDeleteCounts; + } + } + + private static void setIf(boolean expression, ImmutableMap.Builder builder, + String property, Object value) { + if (expression) { + builder.put(property, String.valueOf(value)); + } } } From 8bb3c482495708b335c9c36763c39d2397960cbd Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 2 Oct 2020 13:48:43 -0700 Subject: [PATCH 2/4] Add threshold for partition-level summaries. --- .../apache/iceberg/BaseRewriteManifests.java | 1 + .../java/org/apache/iceberg/FastAppend.java | 3 +++ .../iceberg/MergingSnapshotProducer.java | 2 ++ .../org/apache/iceberg/SnapshotSummary.java | 18 ++++++++++++++++-- .../org/apache/iceberg/TableProperties.java | 3 +++ site/docs/configuration.md | 1 + 6 files changed, 26 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 886a8340585f..1b28f364cb35 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -110,6 +110,7 @@ protected Map summary() { summaryBuilder.set(KEPT_MANIFESTS_COUNT, String.valueOf(keptManifests.size())); summaryBuilder.set(REPLACED_MANIFESTS_COUNT, String.valueOf(rewrittenManifests.size() + deletedManifests.size())); summaryBuilder.set(PROCESSED_ENTRY_COUNT, String.valueOf(entryCount.get())); + summaryBuilder.setPartitionSummaryLimit(0); // do not include partition summaries because data did not change return summaryBuilder.build(); } diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 8b88c218a836..3aeff90a43b4 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -31,6 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.PropertyUtil; import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED; import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT; @@ -79,6 +80,8 @@ protected String operation() { @Override protected Map summary() { + summaryBuilder.setPartitionSummaryLimit(ops.current().propertyAsInt( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT)); return summaryBuilder.build(); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 5febb8130c22..4e8e08c3a829 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -204,6 +204,8 @@ private ManifestFile copyManifest(ManifestFile manifest) { @Override protected Map summary() { + summaryBuilder.setPartitionSummaryLimit(ops.current().propertyAsInt( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT)); return summaryBuilder.build(); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index c81fdca86b07..d3e215e40287 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -68,6 +68,7 @@ public static class Builder { private final Map properties = Maps.newHashMap(); private final Map partitionMetrics = Maps.newHashMap(); private final UpdateMetrics metrics = new UpdateMetrics(); + private int maxChangedPartitionsForSummaries = 0; private long deletedDuplicateFiles = 0L; private boolean trustPartitionMetrics = true; @@ -78,6 +79,19 @@ public void clear() { this.trustPartitionMetrics = true; } + /** + * Sets the maximum number of changed partitions before partition summaries will be excluded. + *

+ * If the number of changed partitions is over this max, summaries will not be included. If the number of changed + * partitions is <= this limit, then partition-level summaries will be included in the summary if they are + * available, and "partition-summaries-included" will be set to "true". + * + * @param max maximum number of changed partitions + */ + public void setPartitionSummaryLimit(int max) { + this.maxChangedPartitionsForSummaries = max; + } + public void incrementDuplicateDeletes() { this.deletedDuplicateFiles += 1; } @@ -167,8 +181,8 @@ public Map build() { Set changedPartitions = partitionMetrics.keySet(); setIf(trustPartitionMetrics, builder, CHANGED_PARTITION_COUNT_PROP, changedPartitions.size()); - if (trustPartitionMetrics && changedPartitions.size() < 100) { - setIf(true, builder, PARTITION_SUMMARY_PROP, "true"); + if (trustPartitionMetrics && changedPartitions.size() <= maxChangedPartitionsForSummaries) { + setIf(changedPartitions.size() > 0, builder, PARTITION_SUMMARY_PROP, "true"); for (String key : changedPartitions) { setIf(key != null, builder, CHANGED_PARTITION_PREFIX + key, partitionSummary(partitionMetrics.get(key))); } diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index ff7e47b5b579..5c2718e29f88 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -99,6 +99,9 @@ private TableProperties() { // If not set, defaults to a "metadata" folder underneath the root path of the table. public static final String WRITE_METADATA_LOCATION = "write.metadata.path"; + public static final String WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit"; + public static final int WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0; + public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled"; public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = true; diff --git a/site/docs/configuration.md b/site/docs/configuration.md index 2badc75792f5..9bb939368834 100644 --- a/site/docs/configuration.md +++ b/site/docs/configuration.md @@ -46,6 +46,7 @@ Iceberg tables support table properties to configure table behavior, like the de | write.metadata.metrics.column.col1 | (not set) | Metrics mode for column 'col1' to allow per-column tuning; none, counts, truncate(length), or full | | write.target-file-size-bytes | Long.MAX_VALUE | Controls the size of files generated to target about this many bytes | | write.wap.enabled | false | Enables write-audit-publish writes | +| write.summary.partition-limit | 0 | Includes partition-level summary stats in snapshot summaries if the changed partition count is less than this limit | | write.metadata.delete-after-commit.enabled | false | Controls whether to delete the oldest version metadata files after commit | | write.metadata.previous-versions-max | 100 | The max number of previous version metadata files to keep before deleting after commit | From e0b09dcc0251ecbbc144d77d69e26cedb2551534 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 2 Oct 2020 14:01:42 -0700 Subject: [PATCH 3/4] Add tests. --- .../org/apache/iceberg/TestFastAppend.java | 72 +++++++++++++++++++ .../org/apache/iceberg/TestMergeAppend.java | 72 +++++++++++++++++++ 2 files changed, 144 insertions(+) diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java index b84dafc8bd90..0740e35cbb95 100644 --- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -406,4 +407,75 @@ public void testInvalidAppendManifest() throws IOException { .appendManifest(manifestWithDeletedFiles) .commit()); } + + @Test + public void testDefaultPartitionSummaries() { + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Set partitionSummaryKeys = table.currentSnapshot().summary().keySet().stream() + .filter(key -> key.startsWith(SnapshotSummary.CHANGED_PARTITION_PREFIX)) + .collect(Collectors.toSet()); + Assert.assertEquals("Should include no partition summaries by default", 0, partitionSummaryKeys.size()); + + String summariesIncluded = table.currentSnapshot().summary() + .getOrDefault(SnapshotSummary.PARTITION_SUMMARY_PROP, "false"); + Assert.assertEquals("Should not set partition-summaries-included to true", "false", summariesIncluded); + + String changedPartitions = table.currentSnapshot().summary().get(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP); + Assert.assertEquals("Should set changed partition count", "1", changedPartitions); + } + + @Test + public void testIncludedPartitionSummaries() { + table.updateProperties() + .set(TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, "1") + .commit(); + + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Set partitionSummaryKeys = table.currentSnapshot().summary().keySet().stream() + .filter(key -> key.startsWith(SnapshotSummary.CHANGED_PARTITION_PREFIX)) + .collect(Collectors.toSet()); + Assert.assertEquals("Should include a partition summary", 1, partitionSummaryKeys.size()); + + String summariesIncluded = table.currentSnapshot().summary() + .getOrDefault(SnapshotSummary.PARTITION_SUMMARY_PROP, "false"); + Assert.assertEquals("Should set partition-summaries-included to true", "true", summariesIncluded); + + String changedPartitions = table.currentSnapshot().summary().get(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP); + Assert.assertEquals("Should set changed partition count", "1", changedPartitions); + + String partitionSummary = table.currentSnapshot().summary() + .get(SnapshotSummary.CHANGED_PARTITION_PREFIX + "data_bucket=0"); + Assert.assertEquals("Summary should include 1 file with 1 record that is 10 bytes", + "added-data-files=1,added-records=1,added-files-size=10", partitionSummary); + } + + @Test + public void testIncludedPartitionSummaryLimit() { + table.updateProperties() + .set(TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, "1") + .commit(); + + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + Set partitionSummaryKeys = table.currentSnapshot().summary().keySet().stream() + .filter(key -> key.startsWith(SnapshotSummary.CHANGED_PARTITION_PREFIX)) + .collect(Collectors.toSet()); + Assert.assertEquals("Should include no partition summaries, over limit", 0, partitionSummaryKeys.size()); + + String summariesIncluded = table.currentSnapshot().summary() + .getOrDefault(SnapshotSummary.PARTITION_SUMMARY_PROP, "false"); + Assert.assertEquals("Should not set partition-summaries-included to true", "false", summariesIncluded); + + String changedPartitions = table.currentSnapshot().summary().get(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP); + Assert.assertEquals("Should set changed partition count", "2", changedPartitions); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index ee7d0f547f72..de4456f47f6e 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -1175,4 +1176,75 @@ public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { Assert.assertEquals(1000, field.fieldId()); Assert.assertEquals("data_bucket", field.name()); } + + @Test + public void testDefaultPartitionSummaries() { + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Set partitionSummaryKeys = table.currentSnapshot().summary().keySet().stream() + .filter(key -> key.startsWith(SnapshotSummary.CHANGED_PARTITION_PREFIX)) + .collect(Collectors.toSet()); + Assert.assertEquals("Should include no partition summaries by default", 0, partitionSummaryKeys.size()); + + String summariesIncluded = table.currentSnapshot().summary() + .getOrDefault(SnapshotSummary.PARTITION_SUMMARY_PROP, "false"); + Assert.assertEquals("Should not set partition-summaries-included to true", "false", summariesIncluded); + + String changedPartitions = table.currentSnapshot().summary().get(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP); + Assert.assertEquals("Should set changed partition count", "1", changedPartitions); + } + + @Test + public void testIncludedPartitionSummaries() { + table.updateProperties() + .set(TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, "1") + .commit(); + + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Set partitionSummaryKeys = table.currentSnapshot().summary().keySet().stream() + .filter(key -> key.startsWith(SnapshotSummary.CHANGED_PARTITION_PREFIX)) + .collect(Collectors.toSet()); + Assert.assertEquals("Should include a partition summary", 1, partitionSummaryKeys.size()); + + String summariesIncluded = table.currentSnapshot().summary() + .getOrDefault(SnapshotSummary.PARTITION_SUMMARY_PROP, "false"); + Assert.assertEquals("Should set partition-summaries-included to true", "true", summariesIncluded); + + String changedPartitions = table.currentSnapshot().summary().get(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP); + Assert.assertEquals("Should set changed partition count", "1", changedPartitions); + + String partitionSummary = table.currentSnapshot().summary() + .get(SnapshotSummary.CHANGED_PARTITION_PREFIX + "data_bucket=0"); + Assert.assertEquals("Summary should include 1 file with 1 record that is 10 bytes", + "added-data-files=1,added-records=1,added-files-size=10", partitionSummary); + } + + @Test + public void testIncludedPartitionSummaryLimit() { + table.updateProperties() + .set(TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, "1") + .commit(); + + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + Set partitionSummaryKeys = table.currentSnapshot().summary().keySet().stream() + .filter(key -> key.startsWith(SnapshotSummary.CHANGED_PARTITION_PREFIX)) + .collect(Collectors.toSet()); + Assert.assertEquals("Should include no partition summaries, over limit", 0, partitionSummaryKeys.size()); + + String summariesIncluded = table.currentSnapshot().summary() + .getOrDefault(SnapshotSummary.PARTITION_SUMMARY_PROP, "false"); + Assert.assertEquals("Should not set partition-summaries-included to true", "false", summariesIncluded); + + String changedPartitions = table.currentSnapshot().summary().get(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP); + Assert.assertEquals("Should set changed partition count", "2", changedPartitions); + } } From 36d59d6b85d7d958b75112aafff68e94cba1a937 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 2 Oct 2020 16:08:47 -0700 Subject: [PATCH 4/4] Fix checkstyle. --- core/src/main/java/org/apache/iceberg/FastAppend.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 3aeff90a43b4..eaa4cd5ed0e3 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -31,7 +31,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.PropertyUtil; import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED; import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;