Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
*/
ThisT deleteWith(Consumer<String> deleteFunc);

/**
* Called to stage a snapshot in table metadata, but not update the current snapshot id.
*
* @return this for method chaining
*/
ThisT stageOnly();

}
15 changes: 14 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void accept(String file) {
private final List<String> manifestLists = Lists.newArrayList();
private Long snapshotId = null;
private TableMetadata base = null;
private boolean stageOnly = false;
private Consumer<String> deleteFunc = defaultDelete;

protected SnapshotProducer(TableOperations ops) {
Expand All @@ -96,6 +97,12 @@ protected SnapshotProducer(TableOperations ops) {

protected abstract ThisT self();

@Override
public ThisT stageOnly() {
this.stageOnly = true;
return self();
}

@Override
public ThisT deleteWith(Consumer<String> deleteCallback) {
Preconditions.checkArgument(this.deleteFunc == defaultDelete, "Cannot set delete callback more than once");
Expand Down Expand Up @@ -230,7 +237,13 @@ public void commit() {
.run(taskOps -> {
Snapshot newSnapshot = apply();
newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata updated = base.replaceCurrentSnapshot(newSnapshot);
TableMetadata updated;
if (stageOnly) {
updated = base.addStagedSnapshot(newSnapshot);
} else {
updated = base.replaceCurrentSnapshot(newSnapshot);
}

// if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
// to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
taskOps.commit(base, updated.withUUID());
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,16 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {
currentSnapshotId, snapshots, snapshotLog);
}

public TableMetadata addStagedSnapshot(Snapshot snapshot) {
List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
.addAll(snapshots)
.add(snapshot)
.build();
return new TableMetadata(ops, null, uuid, location,
snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, newSnapshots, snapshotLog);
}

public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
.addAll(snapshots)
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,7 @@ private TableProperties() {}
public static final String DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)";

public static final String DEFAULT_NAME_MAPPING = "schema.name-mapping.default";

public static final String WRITE_AUDIT_PUBLISH_ENABLED = "write.wap.enabled";
public static final String WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT = "false";
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct
Table table = getTableAndResolveHadoopConfiguration(options, conf);
validateWriteSchema(table.schema(), dsStruct);
String appId = lazySparkSession().sparkContext().applicationId();
return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite, appId));
String wapId = lazySparkSession().conf().get("spark.wap.id", null);
return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite, appId, wapId));
}

@Override
Expand Down
20 changes: 20 additions & 0 deletions spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
Expand Down Expand Up @@ -87,14 +88,20 @@ class Writer implements DataSourceWriter {
private final EncryptionManager encryptionManager;
private final boolean replacePartitions;
private final String applicationId;
private final String wapId;

Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId) {
this(table, options, replacePartitions, applicationId, null);
}

Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId, String wapId) {
this.table = table;
this.format = getFileFormat(table.properties(), options);
this.fileIo = table.io();
this.encryptionManager = table.encryption();
this.replacePartitions = replacePartitions;
this.applicationId = applicationId;
this.wapId = wapId;
}

private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
Expand All @@ -104,6 +111,11 @@ private FileFormat getFileFormat(Map<String, String> tableProperties, DataSource
return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
}

private boolean isWapTable() {
return Boolean.parseBoolean(table.properties().getOrDefault(
TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
}

@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
return new WriterFactory(
Expand All @@ -124,6 +136,14 @@ protected void commitOperation(SnapshotUpdate<?> operation, int numFiles, String
if (applicationId != null) {
operation.set("spark.app.id", applicationId);
}

if (isWapTable() && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
operation.set("wap.id", wapId);
operation.stageOnly();
}

long start = System.currentTimeMillis();
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
Expand Down