Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Aug 17, 2020

This patch will wrap the flink's DataStream as a StreamTable, which could allow user to use SQL to insert records to iceberg table, it will try to provide the similar experience with spark sql. Currently, this patch is depending on #1185.

@rdblue
Copy link
Contributor

rdblue commented Aug 29, 2020

@openinx, can you rebase this? I think it is next to review, right?


return returnStream.addSink(new DiscardingSink())
.name(String.format("IcebergSink %s", table.toString()))
.setParallelism(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are already returning the DataStream, would it make sense to avoid the discarding sink and possibly let people stream the iceberg commit files instead? Like what if I wanted to also feed them into kafka?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean you want to feed the committed data files to kafka ? Is that meaningful for users ? It will be better to understand if we have such user cases I guess.

Some context

in the first sink version, I made the IcebergFilesCommitter implemented the SinkFucntion, then we could chain the function by addSink directly, while we found that it did not work for bounded stream because there was no interface/method to indicate that this stream is a bounded one, then we have no way to commit those data files into iceberg table when the stream has reached its end. So we have to turn to AbstractStreamOperator and implemented a BoundedOneInput interface. Finally, int this version, we will transform the data stream twice (the first one: rowdata -> dataFiles, the second one: datafiles -> void), and finally add a discarding sink.

@openinx
Copy link
Member Author

openinx commented Aug 31, 2020

@rdblue Yeah, Thanks for the merging, let me rebase this.

@openinx
Copy link
Member Author

openinx commented Sep 1, 2020

Ping @rdblue @JingsongLi for reviewing, Thanks.

TableIdentifier icebergIdentifier = catalog.toIdentifier(objectPath);
try {
Table table = catalog.loadIcebergTable(objectPath);
return new IcebergTableSink(icebergIdentifier, table,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to just pass a table loader to sink, source and sink can reuse this loader creation function, just like in:
https://github.com/apache/iceberg/pull/1293/files#diff-0ad7dfff9cfa32fbb760796d976fd650R61
What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me, we also don't need to pass the icebergIdentifier to IcebergTableSink, that makes code more simplier.


@Override
public TableSink<RowData> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
if (!Arrays.equals(tableSchema.getFieldNames(), fieldNames)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a deprecated method, no one will call it, you can just return this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see. will do .

EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Parameterized for batch too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great idea, we could reuse almost all of the codes then.

tEnv.executeSql(String.format("create catalog iceberg_catalog with (" +
"'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", warehouse));
tEnv.executeSql("use catalog iceberg_catalog");
tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is no dynamic table options. (Table hints)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it could be removed now.

DataStream<RowData> stream = generateInputStream(rows);

// Register the rows into a temporary table named 'sourceTable'.
tEnv.createTemporaryView("sourceTable", tEnv.fromDataStream(stream, $("id"), $("data")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use TableEnvironment.fromValues?

SimpleDataUtil.assertTableRecords(warehouse.concat("/default/sourceTable"), expected);
}

private static void waitComplete(TableResult result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just add a method like:

  def execInsertSqlAndWaitResult(tEnv: TableEnvironment, insert: String): JobExecutionResult = {
    tEnv.executeSql(insert).getJobClient.get
      .getJobExecutionResult(Thread.currentThread.getContextClassLoader)
      .get
  }

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.FlinkSink;

public class IcebergTableSink implements AppendStreamTableSink<RowData> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add TODO for these interfaces:
Implement OverwritableTableSink, so in the Flink SQL, user can write these SQLs:
INSERT OVERWRITE t ...
Implement PartitionableTableSink, user can write:
INSERT OVERWRITE/INTO t PARTITION(...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the remainding, before we add the TODO comment, I will try to implement those two interfaces in the next path.

import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestFlinkTableSink extends AbstractTestBase {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about this unit test again, we'd better to extend the FlinkCatalogTestBase so that we could cover both hive and hadoop catalog cases.

sql("USE %s", DATABASE);

Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
this.icebergTable = validationCatalog
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use flink DDL to create table here if #1393 get merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was merged!

try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
throw new UncheckedIOException("Failed to load iceberg table.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: it would be nice to have more context here. Maybe the table loader should define a toString that could be used in the error message here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining the toString sounds good to me.

}

private void replacePartitions(List<DataFile> dataFiles, long checkpointId) {
ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to note that we don't encourage the use of ReplacePartitions because the data it deletes is implicit. It is better to specify what data should be overwritten, like in the new API for Spark:

df.writeTo("iceberg.db.table").overwrite($"date" === "2020-09-01")

If Flink's semantics are to replace partitions for overwrite, then it should be okay. But I highly recommend being more explicit about data replacement.

Copy link
Member Author

@openinx openinx Sep 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Flink's semantics are to replace partitions for overwrite, here is the 1, 2

Table newTable = new HadoopTables().load(tablePath);
try (CloseableIterable<Record> iterable = IcebergGenerics.read(newTable).build()) {
public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
table.refresh();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done automatically when a write completes, or is this a completely separate copy of the table?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't support to scan table by flink sql, so we have to read records from iceberg table by iceberg Java API in unit tests. In this test, we get the icebergTable instance firstly, then the following test methods will commit the iceberg table by flink sql, the icebergTable need a fresh to catch the latest changes.

}

@Test
public void testOverwriteTable() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to also have a partitioned test.

@rdblue
Copy link
Contributor

rdblue commented Sep 2, 2020

+1 overall. I'd merge this now even with a couple of minor comments but it appears that merging #1393 caused conflicts.

@openinx
Copy link
Member Author

openinx commented Sep 3, 2020

The broken unit test for jdk11 is ( jdk8 works fine):

org.apache.iceberg.spark.sql.TestCreateTableAsSelect > testDataFrameV2Replace[1] FAILED
    java.lang.AssertionError: Should have rows matching the source table: number of results should match expected:<3> but was:<6>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:645)
        at org.apache.iceberg.spark.SparkTestBase.assertEquals(SparkTestBase.java:100)
        at org.apache.iceberg.spark.sql.TestCreateTableAsSelect.testDataFrameV2Replace(TestCreateTableAsSelect.java:206)

@openinx openinx closed this Sep 3, 2020
@openinx openinx reopened this Sep 3, 2020
@rdblue rdblue merged commit f153349 into apache:master Sep 3, 2020
@rdblue
Copy link
Contributor

rdblue commented Sep 3, 2020

Merged! Thanks for getting this done, @openinx! It's great to see Flink SQL writes working.

@rdblue rdblue added this to the Java 0.10.0 Release milestone Nov 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants