Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flink-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
strategy:
matrix:
jvm: [8, 11]
flink: ['1.15', '1.16', '1.17']
flink: ['1.16', '1.17', '1.18']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down
2 changes: 1 addition & 1 deletion dev/stage-binaries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#

SCALA_VERSION=2.12
FLINK_VERSIONS=1.15,1.16,1.17
FLINK_VERSIONS=1.16,1.17,1.18
SPARK_VERSIONS=3.2,3.3,3.4,3.5
HIVE_VERSIONS=2,3

Expand Down
8 changes: 4 additions & 4 deletions flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

def flinkVersions = (System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")).split(",")

if (flinkVersions.contains("1.15")) {
apply from: file("$projectDir/v1.15/build.gradle")
}

if (flinkVersions.contains("1.16")) {
apply from: file("$projectDir/v1.16/build.gradle")
}

if (flinkVersions.contains("1.17")) {
apply from: file("$projectDir/v1.17/build.gradle")
}

if (flinkVersions.contains("1.18")) {
apply from: file("$projectDir/v1.18/build.gradle")
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.metrics.MetricNames;
Expand Down Expand Up @@ -93,10 +92,7 @@ public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.setConfiguration(
reporter.addToConfiguration(
// disable classloader check as Avro may cache class in the serializers.
new Configuration().set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false)))
.setConfiguration(reporter.addToConfiguration(new Configuration()))
.withHaLeadershipControl()
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private void testOneSplitFetcher(
ReaderUtil.createCombinedScanTask(
recordBatchList, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory);
IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask(task);
reader.addSplits(Collections.singletonList(split));
reader.addSplits(Arrays.asList(split));

while (readerOutput.getEmittedRecords().size() < expectedCount) {
reader.pollNext(readerOutput);
Expand Down
36 changes: 18 additions & 18 deletions flink/v1.15/build.gradle → flink/v1.18/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

String flinkMajorVersion = '1.15'
String flinkMajorVersion = '1.18'
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")

project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
Expand All @@ -32,15 +32,15 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
implementation project(':iceberg-parquet')
implementation project(':iceberg-hive-metastore')

compileOnly libs.flink115.avro
compileOnly libs.flink118.avro
// for dropwizard histogram metrics implementation
compileOnly libs.flink115.metrics.dropwizard
compileOnly libs.flink115.streaming.java
compileOnly "${libs.flink115.streaming.java.get().module}:${libs.flink115.streaming.java.get().getVersion()}:tests"
compileOnly libs.flink115.table.api.java.bridge
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink115.get()}"
compileOnly libs.flink115.connector.base
compileOnly libs.flink115.connector.files
compileOnly libs.flink118.metrics.dropwizard
compileOnly libs.flink118.streaming.java
compileOnly "${libs.flink118.streaming.java.get().module}:${libs.flink118.streaming.java.get().getVersion()}:tests"
compileOnly libs.flink118.table.api.java.bridge
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}"
compileOnly libs.flink118.connector.base
compileOnly libs.flink118.connector.files

compileOnly libs.hadoop2.hdfs
compileOnly libs.hadoop2.common
Expand All @@ -65,13 +65,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'org.apache.hive', module: 'hive-storage-api'
}

testImplementation libs.flink115.connector.test.utils
testImplementation libs.flink115.core
testImplementation libs.flink115.runtime
testImplementation(libs.flink115.test.utilsjunit) {
testImplementation libs.flink118.connector.test.utils
testImplementation libs.flink118.core
testImplementation libs.flink118.runtime
testImplementation(libs.flink118.test.utilsjunit) {
exclude group: 'junit'
}
testImplementation(libs.flink115.test.utils) {
testImplementation(libs.flink118.test.utils) {
exclude group: "org.apache.curator", module: 'curator-test'
exclude group: 'junit'
}
Expand Down Expand Up @@ -164,7 +164,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
}

// for dropwizard histogram metrics implementation
implementation libs.flink115.metrics.dropwizard
implementation libs.flink118.metrics.dropwizard

// for integration testing with the flink-runtime-jar
// all of those dependencies are required because the integration test extends FlinkTestBase
Expand All @@ -174,13 +174,13 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
integrationImplementation project(path: ":iceberg-flink:iceberg-flink-${flinkMajorVersion}", configuration: "testArtifacts")
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation(libs.flink115.test.utils) {
integrationImplementation(libs.flink118.test.utils) {
exclude group: "org.apache.curator", module: 'curator-test'
exclude group: 'junit'
}

integrationImplementation libs.flink115.table.api.java.bridge
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink115.get()}"
integrationImplementation libs.flink118.table.api.java.bridge
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}"

integrationImplementation libs.hadoop2.common
integrationImplementation libs.hadoop2.hdfs
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
Expand All @@ -60,15 +61,14 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.flink.util.FlinkAlterTableUtil;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -91,7 +91,6 @@
* independent of the partition of Flink.
*/
public class FlinkCatalog extends AbstractCatalog {

private final CatalogLoader catalogLoader;
private final Catalog icebergCatalog;
private final Namespace baseNamespace;
Expand Down Expand Up @@ -439,14 +438,35 @@ private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTab
if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
&& Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
&& equalsPrimary)) {
throw new UnsupportedOperationException("Altering schema is not supported yet.");
throw new UnsupportedOperationException(
"Altering schema is not supported in the old alterTable API. "
+ "To alter schema, use the other alterTable API and provide a list of TableChange's.");
}

validateTablePartition(ct1, ct2);
}

private static void validateTablePartition(CatalogTable ct1, CatalogTable ct2) {
if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
}
}

/**
* This alterTable API only supports altering table properties.
*
* <p>Support for adding/removing/renaming columns cannot be done by comparing CatalogTable
* instances, unless the Flink schema contains Iceberg column IDs.
*
* <p>To alter columns, use the other alterTable API and provide a list of TableChange's.
*
* @param tablePath path of the table or view to be modified
* @param newTable the new table definition
* @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if set
* to false, throw an exception, if set to true, do nothing.
* @throws CatalogException in case of any runtime exception
* @throws TableNotExistException if the table does not exist
*/
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws CatalogException, TableNotExistException {
Expand All @@ -464,12 +484,6 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
}

CatalogTable table = toCatalogTable(icebergTable);

// Currently, Flink SQL only support altering table properties.

// For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by
// comparing
// CatalogTable instances, unless the Flink schema contains Iceberg column IDs.
validateTableSchemaAndPartition(table, (CatalogTable) newTable);

Map<String, String> oldProperties = table.getOptions();
Expand Down Expand Up @@ -507,7 +521,66 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
}
});

commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
FlinkAlterTableUtil.commitChanges(
icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
}

@Override
public void alterTable(
ObjectPath tablePath,
CatalogBaseTable newTable,
List<TableChange> tableChanges,
boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
validateFlinkTable(newTable);

Table icebergTable;
try {
icebergTable = loadIcebergTable(tablePath);
} catch (TableNotExistException e) {
if (!ignoreIfNotExists) {
throw e;
} else {
return;
}
}

// Does not support altering partition yet.
validateTablePartition(toCatalogTable(icebergTable), (CatalogTable) newTable);

String setLocation = null;
String setSnapshotId = null;
String cherrypickSnapshotId = null;

List<TableChange> propertyChanges = Lists.newArrayList();
List<TableChange> schemaChanges = Lists.newArrayList();
for (TableChange change : tableChanges) {
if (change instanceof TableChange.SetOption) {
TableChange.SetOption set = (TableChange.SetOption) change;

if ("location".equalsIgnoreCase(set.getKey())) {
setLocation = set.getValue();
} else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) {
setSnapshotId = set.getValue();
} else if ("cherry-pick-snapshot-id".equalsIgnoreCase(set.getKey())) {
cherrypickSnapshotId = set.getValue();
} else {
propertyChanges.add(change);
}
} else if (change instanceof TableChange.ResetOption) {
propertyChanges.add(change);
} else {
schemaChanges.add(change);
}
}

FlinkAlterTableUtil.commitChanges(
icebergTable,
setLocation,
setSnapshotId,
cherrypickSnapshotId,
schemaChanges,
propertyChanges);
}

private static void validateFlinkTable(CatalogBaseTable table) {
Expand Down Expand Up @@ -552,52 +625,6 @@ private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSc
return partitionKeysBuilder.build();
}

private static void commitChanges(
Table table,
String setLocation,
String setSnapshotId,
String pickSnapshotId,
Map<String, String> setProperties) {
// don't allow setting the snapshot and picking a commit at the same time because order is
// ambiguous and choosing
// one order leads to different results
Preconditions.checkArgument(
setSnapshotId == null || pickSnapshotId == null,
"Cannot set the current snapshot ID and cherry-pick snapshot changes");

if (setSnapshotId != null) {
long newSnapshotId = Long.parseLong(setSnapshotId);
table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
}

// if updating the table snapshot, perform that update first in case it fails
if (pickSnapshotId != null) {
long newSnapshotId = Long.parseLong(pickSnapshotId);
table.manageSnapshots().cherrypick(newSnapshotId).commit();
}

Transaction transaction = table.newTransaction();

if (setLocation != null) {
transaction.updateLocation().setLocation(setLocation).commit();
}

if (!setProperties.isEmpty()) {
UpdateProperties updateProperties = transaction.updateProperties();
setProperties.forEach(
(k, v) -> {
if (v == null) {
updateProperties.remove(k);
} else {
updateProperties.set(k, v);
}
});
updateProperties.commit();
}

transaction.commitTransaction();
}

static CatalogTable toCatalogTable(Table table) {
TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public static LogicalType convert(Type type) {
return TypeUtil.visit(type, new TypeToFlinkType());
}

/**
* Convert a {@link LogicalType Flink type} to a {@link Type}.
*
* @param flinkType a FlinkType
* @return the equivalent Iceberg type
*/
public static Type convert(LogicalType flinkType) {
return flinkType.accept(new FlinkTypeToType());
}

/**
* Convert a {@link RowType} to a {@link TableSchema}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class FlinkTypeToType extends FlinkTypeVisitor<Type> {
private final RowType root;
private int nextId;

FlinkTypeToType() {
this.root = null;
}

FlinkTypeToType(RowType root) {
this.root = root;
// the root struct's fields use the first ids
Expand Down
Loading