From 63d2a972a9b911187e6e40d667a78f2670035008 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 31 Jul 2019 16:35:52 -0700 Subject: [PATCH] Spark: Add support for write-audit-publish workflows. --- .../org/apache/iceberg/SnapshotUpdate.java | 7 +++++++ .../org/apache/iceberg/SnapshotProducer.java | 15 +++++++++++++- .../org/apache/iceberg/TableMetadata.java | 10 ++++++++++ .../org/apache/iceberg/TableProperties.java | 3 +++ .../iceberg/spark/source/IcebergSource.java | 3 ++- .../apache/iceberg/spark/source/Writer.java | 20 +++++++++++++++++++ 6 files changed, 56 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java index 5cabc023d1b0..fdd1a63510b9 100644 --- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java +++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java @@ -45,4 +45,11 @@ public interface SnapshotUpdate extends PendingUpdate { */ ThisT deleteWith(Consumer deleteFunc); + /** + * Called to stage a snapshot in table metadata, but not update the current snapshot id. + * + * @return this for method chaining + */ + ThisT stageOnly(); + } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index e8cc29baf7ea..83f15b5814a2 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -79,6 +79,7 @@ public void accept(String file) { private final List manifestLists = Lists.newArrayList(); private Long snapshotId = null; private TableMetadata base = null; + private boolean stageOnly = false; private Consumer deleteFunc = defaultDelete; protected SnapshotProducer(TableOperations ops) { @@ -96,6 +97,12 @@ protected SnapshotProducer(TableOperations ops) { protected abstract ThisT self(); + @Override + public ThisT stageOnly() { + this.stageOnly = true; + return self(); + } + @Override public ThisT deleteWith(Consumer deleteCallback) { Preconditions.checkArgument(this.deleteFunc == defaultDelete, "Cannot set delete callback more than once"); @@ -230,7 +237,13 @@ public void commit() { .run(taskOps -> { Snapshot newSnapshot = apply(); newSnapshotId.set(newSnapshot.snapshotId()); - TableMetadata updated = base.replaceCurrentSnapshot(newSnapshot); + TableMetadata updated; + if (stageOnly) { + updated = base.addStagedSnapshot(newSnapshot); + } else { + updated = base.replaceCurrentSnapshot(newSnapshot); + } + // if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries // to ensure that if a concurrent operation assigns the UUID, this operation will not fail. taskOps.commit(base, updated.withUUID()); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index ef289b4ec727..ea015ba201c3 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -321,6 +321,16 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) { currentSnapshotId, snapshots, snapshotLog); } + public TableMetadata addStagedSnapshot(Snapshot snapshot) { + List newSnapshots = ImmutableList.builder() + .addAll(snapshots) + .add(snapshot) + .build(); + return new TableMetadata(ops, null, uuid, location, + snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + currentSnapshotId, newSnapshots, snapshotLog); + } + public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { List newSnapshots = ImmutableList.builder() .addAll(snapshots) diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 4dd778d165b8..2ca8a25e7d9c 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -93,4 +93,7 @@ private TableProperties() {} public static final String DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"; public static final String DEFAULT_NAME_MAPPING = "schema.name-mapping.default"; + + public static final String WRITE_AUDIT_PUBLISH_ENABLED = "write.wap.enabled"; + public static final String WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT = "false"; } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index b6dfade9a09d..3656787d2911 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -75,7 +75,8 @@ public Optional createWriter(String jobId, StructType dsStruct Table table = getTableAndResolveHadoopConfiguration(options, conf); validateWriteSchema(table.schema(), dsStruct); String appId = lazySparkSession().sparkContext().applicationId(); - return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite, appId)); + String wapId = lazySparkSession().conf().get("spark.wap.id", null); + return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite, appId, wapId)); } @Override diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java index aa9becc12ea8..f5205057e584 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -45,6 +45,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; @@ -87,14 +88,20 @@ class Writer implements DataSourceWriter { private final EncryptionManager encryptionManager; private final boolean replacePartitions; private final String applicationId; + private final String wapId; Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId) { + this(table, options, replacePartitions, applicationId, null); + } + + Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId, String wapId) { this.table = table; this.format = getFileFormat(table.properties(), options); this.fileIo = table.io(); this.encryptionManager = table.encryption(); this.replacePartitions = replacePartitions; this.applicationId = applicationId; + this.wapId = wapId; } private FileFormat getFileFormat(Map tableProperties, DataSourceOptions options) { @@ -104,6 +111,11 @@ private FileFormat getFileFormat(Map tableProperties, DataSource return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); } + private boolean isWapTable() { + return Boolean.parseBoolean(table.properties().getOrDefault( + TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT)); + } + @Override public DataWriterFactory createWriterFactory() { return new WriterFactory( @@ -124,6 +136,14 @@ protected void commitOperation(SnapshotUpdate operation, int numFiles, String if (applicationId != null) { operation.set("spark.app.id", applicationId); } + + if (isWapTable() && wapId != null) { + // write-audit-publish is enabled for this table and job + // stage the changes without changing the current snapshot + operation.set("wap.id", wapId); + operation.stageOnly(); + } + long start = System.currentTimeMillis(); operation.commit(); // abort is automatically called if this fails long duration = System.currentTimeMillis() - start;