Skip to content

Conversation

@JingsongLi
Copy link
Contributor

Fixes #1392

  1. Create table: Only support identity partition, wait for Flink SQL support hidden partition, but can load all Iceberg tables.
  2. Alter table:
    • Currently, Flink SQL only support altering table properties, not support altering schema and partition.
    • Also support location, current-snapshot-id and cherry-pick-snapshot-id like Spark.


Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
Map<String, String> options = Maps.newHashMap(table.getOptions());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better to use ImmutableMap ?

Table icebergTable = getIcebergTable(tablePath);
CatalogTable table = toCatalogTable(icebergTable);

// Currently, Flink SQL only support altering table properties.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason that we could not support adding /removing/renaming column ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No Flink DLL to add/removing/renaming column...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should also note that support for adding/removing/renaming columns cannot be done by comparing CatalogTable instances, unless the Flink schema contains Iceberg column IDs.

The problem is clear when you consider a simple example:

  • Iceberg table schema: id bigint, a float, b float
  • Flink table schema: id bigint, x float, y float

There are two ways to get the Flink schema: rename a -> x and b -> y, or drop a, drop b, add x, add y. Guessing which one was intended by the user is not okay because it would corrupt data. If the values from a are read when projecting x after a was actually dropped, then this is a serious correctness bug.

Also note that there are some transformations that can't be detected. For example, drop a then add a. The result should be that all values of column a are discarded. This happens when the wrong data was written to a column but the column is still needed for newer data.

Copy link
Contributor Author

@JingsongLi JingsongLi Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.
If there is only one operation in a single call, it seems feasible. And actually, in the SQL DDLs, only a single type is in a single SQL.
But yes, this API is unclear, if we look at it from the API level alone, there are too many possibilities...
I'll add comments in the code.


oldOptions.keySet().forEach(k -> {
if (!newTable.getOptions().containsKey(k)) {
setProperties.put(k, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Does this align with the flink sql semantics ?
I saw the document said: "Set one or more properties in the specified table. If a particular property is already set in the table, override the old value with the new one."

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

For the existing key-values (in old table ) which don't appear in the new table, should we remove them from old table ? ( The document did not describe this case clearly, just for confirmation).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can take a look to tests.
The existing key-values will be keep. The newTable.getOptions() is not just from alter DDL. It is already merged with old options.
Actually, there is not DDL to delete key-value too...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is already merged with old options.

If the new options are already merged with old options, so for the key in old options, shouldn't it be always in new options ? Seems there's no reason to add this sentence here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?
The new options is new all options.
For example:
old: {'j' = 'am', 'p' = 'an'}
alter: ALTER TABLE t UNSET TBLPROPERTIES ('j')
newTable.getOptions() will be: {'p' = 'an'}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a test for unsetting PROPERTIES.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is okay to diff the property sets like this, but it seems like it would be easier not to. Right now, Flink has to apply the changes, then this code diffs the property sets, then Iceberg will re-apply the changes. In addition, this model doesn't work for schema updates, as I noted above.

Copy link
Contributor Author

@JingsongLi JingsongLi Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But properties updates does not have column IDs. As long as the last is the same.
Sorry, I don't get your point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setProperties.put(k, null) ? The javadoc from Map said :

 * @throws NullPointerException if the specified key or value is null
     *         and this map does not permit null keys or values
     * @throws IllegalArgumentException if some property of the specified key
     *         or value prevents it from being stored in this map
     */
    V put(K key, V value);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that there isn't a correctness problem so this is okay. But, this causes Flink to do much more work because it has to apply changes from SQL, then recover those changes by comparing property maps, and pass the changes to Iceberg so that Iceberg can apply the changes. It is easier to pass the changes directly to Iceberg if the Flink API can be updated to support it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I think we can have a try in Flink.

TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());

// NOTE: We can not create a IcebergCatalogTable, because Flink optimizer may use CatalogTableImpl to copy a new
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we may need to change this comment ? Actually, I did not found any class named IcebergCatalogTable, or I misunderstood something ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, we cannot create a IcebergCatalogTable class extends CatalogTable to carry iceberg Table Object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, maybe could write this comment more clear.

// NOTE: We can not create a IcebergCatalogTable, because Flink optimizer may use CatalogTableImpl to copy a new
// catalog table.
// Let's re-loading table from Iceberg catalog when creating source/sink operators.
return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is null? Could you add a comment?

// catalog table.
// Let's re-loading table from Iceberg catalog when creating source/sink operators.
return new CatalogTableImpl(tableSchema, table.properties(), null);
private Table getIcebergTable(ObjectPath tablePath) throws TableNotExistException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get is not a very specific verb. I usually prefer load for cases like this because it more accurately describes what is happening.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on the get, but it does align with the interface's getTable method which this is used in. However, it also calls out to load so the argument that the iceberg specific stuff might stick with load vs get is still valid.

Copy link
Contributor Author

@JingsongLi JingsongLi Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it is an iceberg table, change it to load.

}

private static void validateFlinkTable(CatalogBaseTable table) {
Preconditions.checkArgument(table instanceof CatalogTable, "The Table should be a CatalogTable.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where CatalogBaseTable doesn't implement CatalogTable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CatalogTable is a subinterface that inherits from CatalogBaseTable. So definitely, yes.

See the java docs on the current CatalogBaseTable in Flink:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/catalog/CatalogBaseTable.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is CatalogView, Iceberg catalog does not support views, so if there is a view, should be a bug...

throw new UnsupportedOperationException("Creating table with watermark specs is not supported yet.");
}

if (schema.getPrimaryKey().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something we should add to Iceberg for Flink use cases? What does Flink use the primary key for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this which might answer your question: https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API

In particular, here are the proposed changes:

Proposed Changes
We suggest to introduce the concept of primary key constraint as a hint for FLINK to leverage for optimizations.

Primary key constraints tell that a column or a set of columns of a table or a view are unique and they do not contain null.
Neither of columns in a primary can be nullable.
Primary key therefore uniquely identify a row in a table.

So it sounds just like an RDBMS primary key.

Note however, that even in the FLIP (which is just the proposal and not necessarily the finished product), it does state that there's no planned enforcement on the PK. It's up to the user to ensure that the PK is non-null and unique.

Primary key validity checks
SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED.
This controls if the constraint checks are performed on the incoming/outgoing data.
Flink does not own the data therefore the only mode we want to support is the NOT ENFORCED mode.
Its up to the user to ensure that the query enforces key integrity.

So I agree here that throwing might be the most useful option and that there's likely nothing on the iceberg side to be added to enforce this as Flink doesn't enforce it either. In an entirely streaming setting, ensuring unique keys would be rather difficult and so to me it somewhat sounds like the PK is just more metadata that could very well be in TBLPROPERTIES.

But a more experienced Flink SQL user than myself might have more to say on the matter. I've never attempted to enforce a PK when using Flink SQL. Sounds like the work to do so would involve custom operators etc.

TLDR: The Primary Key is just a constraint, which is currently part of Flink's Table spec but goes unenforced and is up to the user. It does not appear as though the PK info is supported in any UpsertSinks etc, though that may be discussed / planned in the future. Support in the DDL for Primary Key constraints is relatively new (Flink 1.11 / current, with support in the API coming in at Flink 1.10).

Copy link
Contributor Author

@JingsongLi JingsongLi Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kbendick .
At present, in Flink, PK is mainly used to process CDC stream.

  • For example, if user access a Kafka source of a CDC stream, the user can define a primary key. In this way, the downstream can perform efficient dynamic table / static table conversion (restore to the original static table) according to a certain primary key.
  • For example, when the stream data (CDC) is written into a JDBC database, the user can define primary key. In this way, Flink can insert the data into the database by using the upsert writing way.

I think, If iceberg supports CDC native processing in the future, we may be able to use it.

// Not created by Flink SQL.
// For compatibility with iceberg tables, return empty.
// TODO modify this after Flink support partition transform.
return Collections.emptyList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tables with any partition transform other than identity appear to be unpartitioned? Why not return all of the identity fields at least?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, it seems like adding all of the identity fields (but not the transformed fields) would likely be incorrect. Although returning an empty list when the table is partitioned seems like a possible correctness bug to me too.

Should we consider throwing an exception in this case instead until such a time that Flink supports partition transforms?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to likely be incorrect.
But I don't want to throw an exception because it can read the existing iceberg table.
Actually, the returned partition keys are useless, except that Flink can show users the meta information of the table.

All partition operations are directly delegated to specific source / sink, so Flink does not need to see partition information.
I tend to support it so that we can read existing Iceberg tables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that since operations are delegated to Iceberg, correctness is not an issue. It would be nice to show users which columns are partition columns so they can see which ones are good candidates for query predicates. I don't think this is a blocker, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, maybe we can expose these information in properties.

table.manageSnapshots().cherrypick(newSnapshotId).commit();
}

Transaction transaction = table.newTransaction();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do the operations above this point in the transaction as well? That seems reasonable to me. I'm not sure why we don't in other places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the manageSnapshots is unsupported in TransactionTable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That explains it. Thanks!

Assume.assumeFalse("HadoopCatalog does not support relocate table", isHadoopCatalog);

tEnv.executeSql("CREATE TABLE tl(id BIGINT)");
tEnv.executeSql("ALTER TABLE tl SET('location'='/tmp/location')");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Flink support the LOCATION clause, or just the table property?

Copy link
Contributor

@kbendick kbendick Aug 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: In the CREATE TABLE documentation, I cannot find a reference to LOCATION outside of table properties: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, looking forward at the Flink 1.12 prerelease docs, it appears that there's added support for a Hive Dialect which supports LOCATION clause.

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#create-1

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
  [(col_name data_type [column_constraint] [COMMENT col_comment], ... [table_constraint])]
  [COMMENT table_comment]
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
  [
    [ROW FORMAT row_format]
    [STORED AS file_format]
  ]
  [LOCATION fs_path]
  [TBLPROPERTIES (property_name=property_value, ...)]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just the table property...
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
Flink Filesystem connector also support location using path property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Hive dialect should only works for Hive tables. There are lots of tricky things..

@rdblue
Copy link
Contributor

rdblue commented Aug 28, 2020

This looks mostly good to me. I just have a few questions.

if ("location".equalsIgnoreCase(entry.getKey())) {
location = entry.getValue();
} else {
properties.put(entry.getKey(), entry.getValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should location still be placed in the table properties or will that cause some kind of conflict / error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no conflict/error, but I think it is good to reduce duplicate storage, cause iceberg has saved this information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to duplicate it in table properties. Then we would have to worry about keeping the two in sync.

throw new UnsupportedOperationException("Creating table with watermark specs is not supported yet.");
}

if (schema.getPrimaryKey().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this which might answer your question: https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API

In particular, here are the proposed changes:

Proposed Changes
We suggest to introduce the concept of primary key constraint as a hint for FLINK to leverage for optimizations.

Primary key constraints tell that a column or a set of columns of a table or a view are unique and they do not contain null.
Neither of columns in a primary can be nullable.
Primary key therefore uniquely identify a row in a table.

So it sounds just like an RDBMS primary key.

Note however, that even in the FLIP (which is just the proposal and not necessarily the finished product), it does state that there's no planned enforcement on the PK. It's up to the user to ensure that the PK is non-null and unique.

Primary key validity checks
SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED.
This controls if the constraint checks are performed on the incoming/outgoing data.
Flink does not own the data therefore the only mode we want to support is the NOT ENFORCED mode.
Its up to the user to ensure that the query enforces key integrity.

So I agree here that throwing might be the most useful option and that there's likely nothing on the iceberg side to be added to enforce this as Flink doesn't enforce it either. In an entirely streaming setting, ensuring unique keys would be rather difficult and so to me it somewhat sounds like the PK is just more metadata that could very well be in TBLPROPERTIES.

But a more experienced Flink SQL user than myself might have more to say on the matter. I've never attempted to enforce a PK when using Flink SQL. Sounds like the work to do so would involve custom operators etc.

TLDR: The Primary Key is just a constraint, which is currently part of Flink's Table spec but goes unenforced and is up to the user. It does not appear as though the PK info is supported in any UpsertSinks etc, though that may be discussed / planned in the future. Support in the DDL for Primary Key constraints is relatively new (Flink 1.11 / current, with support in the API coming in at Flink 1.10).

}

private static void validateFlinkTable(CatalogBaseTable table) {
Preconditions.checkArgument(table instanceof CatalogTable, "The Table should be a CatalogTable.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CatalogTable is a subinterface that inherits from CatalogBaseTable. So definitely, yes.

See the java docs on the current CatalogBaseTable in Flink:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/catalog/CatalogBaseTable.html

// Not created by Flink SQL.
// For compatibility with iceberg tables, return empty.
// TODO modify this after Flink support partition transform.
return Collections.emptyList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, it seems like adding all of the identity fields (but not the transformed fields) would likely be incorrect. Although returning an empty list when the table is partitioned seems like a possible correctness bug to me too.

Should we consider throwing an exception in this case instead until such a time that Flink supports partition transforms?

// don't allow setting the snapshot and picking a commit at the same time because order is ambiguous and choosing
// one order leads to different results
Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId == null,
"Cannot set the current the current snapshot ID and cherry-pick snapshot changes");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Duplication of the words the current in the Preconditions string. It currently reads Cannot set the current the current.

@JingsongLi
Copy link
Contributor Author

JingsongLi commented Aug 31, 2020

Thanks a lot for your review~ @rdblue and @kbendick I'll take a look~

}

private CatalogTable toCatalogTable(Table table) {
TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about making the iceberg schema -> flink TableSchema conversion to be a static method inside FlinkSchemaUtil ? The table sink pr #1348 will also depend on this static method (https://github.com/apache/iceberg/pull/1348/files#diff-0ad7dfff9cfa32fbb760796d976fd650R50).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static looks good to me, but I think we can keep it in this class.

TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());

// NOTE: We can not create a IcebergCatalogTable, because Flink optimizer may use CatalogTableImpl to copy a new
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, maybe could write this comment more clear.

throw new UnsupportedOperationException("Creating table with primary key is not supported yet.");
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add a TODO indicating that we flink only support identity partition now but will support hidden column future ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already have todo in toPartitionKeys.


oldOptions.keySet().forEach(k -> {
if (!newTable.getOptions().containsKey(k)) {
setProperties.put(k, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setProperties.put(k, null) ? The javadoc from Map said :

 * @throws NullPointerException if the specified key or value is null
     *         and this map does not permit null keys or values
     * @throws IllegalArgumentException if some property of the specified key
     *         or value prevents it from being stored in this map
     */
    V put(K key, V value);

if (!setProperties.isEmpty()) {
UpdateProperties updateProperties = transaction.updateProperties();
setProperties.forEach((k, v) -> {
if (v == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The v should never be null in HashMap ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashMap allows nulls:

 * Hash table based implementation of the <tt>Map</tt> interface.  This
 * implementation provides all of the optional map operations, and permits
 * <tt>null</tt> values and the <tt>null</tt> key.  (The <tt>HashMap</tt>
 * class is roughly equivalent to <tt>Hashtable</tt>, except that it is
 * unsynchronized and permits nulls.)

@rdblue rdblue merged commit d4dee8e into apache:master Sep 2, 2020
@rdblue
Copy link
Contributor

rdblue commented Sep 2, 2020

+1

Thank @JingsongLi, I merged this to master. And thanks to @openinx and @kbendick for reviewing!

@JingsongLi JingsongLi deleted the ddl branch November 5, 2020 09:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flink: Support creating table and altering table in Flink SQL

4 participants