Skip to content
This repository was archived by the owner on Feb 6, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions docs/content/branching-and-tagging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
---
title: "Branching and Tagging"
url: branching
aliases:
- "tables/branching"
menu:
main:
parent: Tables
weight: 0
---

<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->

# Branching and Tagging

## Overview

Iceberg table metadata maintains a log of snapshots which represent the changes applied to a table.
Snapshots are fundamental in Iceberg as they are the basis for reader isolation and time travel queries.
For controlling metadata size and storage costs, Iceberg provides snapshot lifecycle management procedures such as [`expire_snapshots`](../../spark/spark-procedures/#expire-snapshots) for removing unused snapshots and no longer neccessary data files based on table snapshot retention properties.

**For more sophisticated snapshot lifecycle management, Iceberg supports branches and tags which are named references to snapshots with their own independent lifecycles. This lifecycle is controlled by branch and tag level retention policies.**
Branches are independent lineages of snapshots and point to the head of the lineage.
Branches and tags have a maximum reference age property which control when the reference to the snapshot itself should be expired.
Branches have retention properties which define the minimum number of snapshots to retain on a branch as well as the maximum age of individual snapshots to retain on the branch.
These properties are used when the expireSnapshots procedure is run.
For details on the algorithm for expireSnapshots, refer to the [spec](../../../spec#snapshot-retention-policy).

## Use Cases

Branching and tagging can be used for handling GDPR requirements and retaining important historical snapshots for auditing.
Branches can also be used as part of data engineering workflows, for enabling experimental branches for testing and validating new jobs.
See below for some examples of how branching and tagging can facilitate these use cases.

### Historical Tags

Tags can be used for retaining important historical snapshots for auditing purposes.

![Historical Tags](../img/historical-snapshot-tag.png)

The above diagram demonstrates retaininig important historical snapshot with the following retention policy, defined
via Spark SQL.

1. Retain 1 snapshot per week for 1 month. This can be achieved by tagging the weekly snapshot and setting the tag retention to be a month.
snapshots will be kept, and the branch reference itself will be retained for 1 week.
```sql
-- Create a tag for the first end of week snapshot. Retain the snapshot for a week
ALTER TABLE prod.db.table CREATE TAG 'EOW-01' AS OF VERSION 7 RETAIN 7 DAYS
```

2. Retain 1 snapshot per month for 6 months. This can be achieved by tagging the monthly snapshot and setting the tag retention to be 6 months.
```sql
-- Create a tag for the first end of month snapshot. Retain the snapshot for 6 months
ALTER TABLE prod.db.table CREATE TAG 'EOM-01' AS OF VERSION 30 RETAIN 180 DAYS
```

3. Retain 1 snapshot per year forever. This can be achieved by tagging the annual snapshot. The default retention for branches and tags is forever.
```sql
-- Create a tag for the end of the year and retain it forever.
ALTER TABLE prod.db.table CREATE TAG 'EOY-2023' AS OF VERSION 365
```

4. Create a temporary "test-branch" which is retained for 7 days and the latest 2 snapshots on the branch are retained.
```sql
-- Create a branch "test-branch" which will be retained for 7 days along with the latest 2 snapshots
ALTER TABLE prod.db.table CREATE BRANCH test-branch RETAIN 7 DAYS WITH RETENTION 2 SNAPSHOTS
```

### Audit Branch

![Audit Branch](../img/audit-branch.png)

The above diagram shows an example of using an audit branch for validating a write workflow.

1. First ensure `write.wap.enabled` is set.
```sql
ALTER TABLE db.table SET TBLPROPERTIES (
'write.wap.enabled''true'
)
```
2. Create `audit-branch` starting from snapshot 3, which will be written to and retained for 1 week.
```sql
ALTER TABLE db.table CREATE BRANCH `audit-branch` AS OF VERSION 3 RETAIN 7 DAYS
```
3. Writes are performed on a separate `audit-branch` independent from the main table history.
```sql
-- WAP Branch write
SET spark.wap.branch = 'audit-branch'
INSERT INTO prod.db.table VALUES (3, 'c')
```
4. A validation workflow can validate (e.g. data quality) the state of `audit-branch`.
5. After validation, the main branch can be `fastForward` to the head of `audit-branch` to update the main table state.
```java
table.manageSnapshots().fastForward("main", "audit-branch").commit()
```
6. The branch reference will be removed when `expireSnapshots` is run 1 week later.

## Usage

Creating, querying and writing to branches and tags are supported in the Iceberg Java library, and in Spark and Flink engine integrations.

- [Iceberg Java Library](../../java-api-quickstart/#branching-and-tagging)
- [Spark DDLs](../spark-ddl/#branching-and-tagging-ddl)
- [Spark Reads](../spark-queries/#time-travel)
- [Spark Branch Writes](../spark-writes/#writing-to-branches)
- [Flink Reads](../flink-queries/#reading-branches-and-tags-with-SQL)
- [Flink Branch Writes](../flink-writes/#branch-writes)
122 changes: 122 additions & 0 deletions docs/content/delta-lake-migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
---
title: "Delta Lake Migration"
url: delta-lake-migration
menu:
main:
parent: "Migration"
weight: 300
---
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->

# Delta Lake Table Migration
Delta Lake is a table format that supports Parquet file format and provides time travel and versioning features. When migrating data from Delta Lake to Iceberg,
it is common to migrate all snapshots to maintain the history of the data.

Currently, Iceberg supports the Snapshot Table action for migrating from Delta Lake to Iceberg tables.
Since Delta Lake tables maintain transactions, all available transactions will be committed to the new Iceberg table as transactions in order.
For Delta Lake tables, any additional data files added after the initial migration will be included in their corresponding transactions and subsequently added to the new Iceberg table using the Add Transaction action.
The Add Transaction action, a variant of the Add File action, is still under development.

## Enabling Migration from Delta Lake to Iceberg
The `iceberg-delta-lake` module is not bundled with Spark and Flink engine runtimes. To enable migration from delta lake features, the minimum required dependencies are:
- [iceberg-delta-lake](https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-delta-lake/1.2.1/iceberg-delta-lake-1.2.1.jar)
- [delta-standalone-0.6.0](https://repo1.maven.org/maven2/io/delta/delta-standalone_2.13/0.6.0/delta-standalone_2.13-0.6.0.jar)
- [delta-storage-2.2.0](https://repo1.maven.org/maven2/io/delta/delta-storage/2.2.0/delta-storage-2.2.0.jar)

### Compatibilities
The module is built and tested with `Delta Standalone:0.6.0` and supports Delta Lake tables with the following protocol version:
* `minReaderVersion`: 1
* `minWriterVersion`: 2

Please refer to [Delta Lake Table Protocol Versioning](https://docs.delta.io/latest/versioning.html) for more details about Delta Lake protocol versions.

### API
The `iceberg-delta-lake` module provides an interface named `DeltaLakeToIcebergMigrationActionsProvider`, which contains actions that helps converting from Delta Lake to Iceberg.
The supported actions are:
* `snapshotDeltaLakeTable`: snapshot an existing Delta Lake table to an Iceberg table

### Default Implementation
The `iceberg-delta-lake` module also provides a default implementation of the interface which can be accessed by
```java
DeltaLakeToIcebergMigrationActionsProvider defaultActions = DeltaLakeToIcebergMigrationActionsProvider.defaultActions()
```

## Snapshot Delta Lake Table to Iceberg
The action `snapshotDeltaLakeTable` reads the Delta Lake table's transactions and converts them to a new Iceberg table with the same schema and partitioning in one iceberg transaction.
The original Delta Lake table remains unchanged.

The newly created table can be changed or written to without affecting the source table, but the snapshot uses the original table's data files.
Existing data files are added to the Iceberg table's metadata and can be read using a name-to-id mapping created from the original table schema.

When inserts or overwrites run on the snapshot, new files are placed in the snapshot table's location. The location is default to be the same as that
of the source Delta Lake Table. Users can also specify a different location for the snapshot table.

{{< hint info >}}
Because tables created by `snapshotDeltaLakeTable` are not the sole owners of their data files, they are prohibited from
actions like `expire_snapshots` which would physically delete data files. Iceberg deletes, which only effect metadata,
are still allowed. In addition, any operations which affect the original data files will disrupt the Snapshot's
integrity. DELETE statements executed against the original Delta Lake table will remove original data files and the
`snapshotDeltaLakeTable` table will no longer be able to access them.
{{< /hint >}}

#### Usage
| Required Input | Configured By | Description |
|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------|
| Source Table Location | Argument [`sourceTableLocation`](https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/delta/DeltaLakeToIcebergMigrationActionsProvider.html#snapshotDeltaLakeTable(java.lang.String)) | The location of the source Delta Lake table |
| New Iceberg Table Identifier | Configuration API [`as`](https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/delta/SnapshotDeltaLakeTable.html#as(org.apache.iceberg.catalog.TableIdentifier)) | The identifier specifies the namespace and table name for the new iceberg table |
| Iceberg Catalog | Configuration API [`icebergCatalog`](https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/delta/SnapshotDeltaLakeTable.html#icebergCatalog(org.apache.iceberg.catalog.Catalog)) | The catalog used to create the new iceberg table |
| Hadoop Configuration | Configuration API [`deltaLakeConfiguration`](https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/delta/SnapshotDeltaLakeTable.html#deltaLakeConfiguration(org.apache.hadoop.conf.Configuration)) | The Hadoop Configuration used to read the source Delta Lake table. |

For detailed usage and other optional configurations, please refer to the [SnapshotDeltaLakeTable API](https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/delta/SnapshotDeltaLakeTable.html)

#### Output
| Output Name | Type | Description |
| ------------|------|-------------|
| `imported_files_count` | long | Number of files added to the new table |

#### Added Table Properties
The following table properties are added to the Iceberg table to be created by default:

| Property Name | Value | Description |
|-------------------------------|-------------------------------------------|--------------------------------------------------------------------|
| `snapshot_source` | `delta` | Indicates that the table is snapshot from a delta lake table |
| `original_location` | location of the delta lake table | The absolute path to the location of the original delta lake table |
| `schema.name-mapping.default` | JSON name mapping derived from the schema | The name mapping string used to read Delta Lake table's data files |

#### Examples
```java
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.Catalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.delta.DeltaLakeToIcebergMigrationActionsProvider;

String sourceDeltaLakeTableLocation = "s3://my-bucket/delta-table";
String destTableLocation = "s3://my-bucket/iceberg-table";
TableIdentifier destTableIdentifier = TableIdentifier.of("my_db", "my_table");
Catalog icebergCatalog = ...; // Iceberg Catalog fetched from engines like Spark or created via CatalogUtil.loadCatalog
Configuration hadoopConf = ...; // Hadoop Configuration fetched from engines like Spark and have proper file system configuration to access the Delta Lake table.

DeltaLakeToIcebergMigrationActionsProvider.defaultActions()
.snapshotDeltaLakeTable(sourceDeltaLakeTableLocation)
.as(destTableIdentifier)
.icebergCatalog(icebergCatalog)
.tableLocation(destTableLocation)
.deltaLakeConfiguration(hadoopConf)
.tableProperty("my_property", "my_value")
.execute();
```
6 changes: 5 additions & 1 deletion docs/content/flink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ env.getConfig()
| starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just for FIP27 Source. |
| start-snapshot-timestamp | N/A | N/A | null | Start to read data from the most recent snapshot as of the given time in milliseconds. |
| start-snapshot-id | N/A | N/A | null | Start to read data from the specified snapshot-id. |
| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. |
| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot.
| branch | N/A | N/A | main | Specifies the branch to read from in batch mode
| tag | N/A | N/A | null | Specifies the tag to read from in batch mode
| start-tag | N/A | N/A | null | Specifies the starting tag to read from for incremental reads
| end-tag | N/A | N/A | null | Specifies the ending tag to to read from for incremental reads |
| split-size | connector.iceberg.split-size | read.split.target-size | 128 MB | Target size when combining input splits. |
| split-lookback | connector.iceberg.split-file-open-cost | read.split.planning-lookback | 10 | Number of bins to consider when combining input splits. |
| split-file-open-cost | connector.iceberg.split-file-open-cost | read.split.open-file-cost | 4MB | The estimated cost to open a file, used as a minimum weight when combining splits. |
Expand Down
10 changes: 10 additions & 0 deletions docs/content/flink-getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ FlinkSink.forRowData(input)
env.execute("Test Iceberg DataStream");
```

### Branch Writes
Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink`
For more information on branches please refer to [branches](../../tables/branching).
```java
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.toBranch("audit-branch")
.append();
```

## Reading

Submit a Flink __batch__ job using the following sentences:
Expand Down
Loading