Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Apr 2, 2021

This is built on top of #2354

How to export MySQL CDC into apache iceberg table in flink streaming job ?

Preparation

As we will define an apache iceberg sink table in flink+hive catalog, so you will need to follow all those steps in here.

Another side, we will define an mysql CDC table in catalog, so we will need to download the flink-sql-connector-mysql-cdc-1.2.0.jar from here and put it under the $FLINK_HOME/lib directory:

wget https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.2.0/flink-sql-connector-mysql-cdc-1.2.0.jar
cp flink-sql-connector-mysql-cdc-1.2.0.jar $FLINK_HOME/lib/

Define MySQL source table

You need to start a local mysql server and then enable the binlog (Pls follow the document). Then create a test database in your mysql server and run the sysbench tool to initialize your tables:

sysbench --threads=10 oltp_common \
    --tables=10 \
    --table-size=10000 \
    --mysql-host=localhost \
    --mysql-user=<your-mysql-user> \
    --mysql-password=<your-mysql-password> \
    --mysql-db=test \
    prepare

After that you can see the mysql tables (from sbtest1 to sbtest10) under database test. Next please open you flink sql client by ./bin/sql-client.sh embedded shell and create a mysql cdc source table:

CREATE TABLE sbtest1(
  `id`  INT NOT NULL,
  `k`   INT NOT NULL,
  `c`   CHAR(120) NOT NULL,
  `pad` char(60) NOT NULL 
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = '<your-mysql-user>',
  'password' = '<your-mysql-password>',
  'database-name' = 'test',
  'table-name' = 'sbtest1'
);

Define apache iceberg sink table

To create an apache iceberg sink table, you could just execute the flink sql in your flink sql client:

(For how to configure hive catalog, you may want to read iceberg document)

CREATE CATALOG hive_catalog WITH (
   'type'='iceberg',
   'catalog-type'='hive',
   'uri'='thrift://localhost:9083',
   'clients'='5',
   'property-version'='1',
   'warehouse'='file:///Users/openinx/test/iceberg-warehouse'
);
USE hive_catalog;

CREATE DATABASE mysql_db;
USE mysql_db;

CREATE TABLE iceberg_sbtest1(
  `id`  INT NOT NULL,
  `k`   INT NOT NULL,
  `c`   CHAR(120) NOT NULL,
  `pad` char(60) NOT NULL,
  PRIMARY KEY(id) NOT ENFORCED
);

Submit flink streaming job to export change log from MySQL server to iceberg table

INSERT INTO iceberg_sbtest1 SELECT * FROM default_catalog.default_database.sbtest1 ;

Then you could try to write few records into the source mysql table sbtest1 by sysbench:

sysbench --mysql-host=localhost \
    --mysql-user=<your-mysql-user> \
    --mysql-password=<your-mysql-password>  \
    --mysql-db=test \
    --threads=1 \
    --rate=52 \
    --time=1800 \
    --report-interval=10  \      
    oltp_insert \
    --table_size=60000000 \
    --skip_trx=on \
    run

Verify the records between mysql table and iceberg table:

At one point of time, we kill the sysbench process, and veriy the records.

In mysql table:

mysql> select count(*) from sbtest1 ; 
+----------+
| count(*) |
+----------+
|    54358 |
+----------+
1 row in set (0.01 sec)

In iceberg table:

Flink SQL> select count(*) from iceberg_sbtest1;
+--------+
| EXPR$0 |
+--------+
|  54358 |
+--------+
1 row in set

@openinx
Copy link
Member Author

openinx commented Apr 2, 2021

If we want to sync deletions from mysql binlog into apache iceberg table, then we will need to upgrade the iceberg table from version 1 to version 2 by following code as we don't expose the v2 to end users now.

import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class TestIceberg {

  private TestIceberg() {
  }

  public static void main(String[] args) {

    Map<String, String> properties = Maps.newHashMap();
    properties.put("type", "iceberg");
    properties.put("catalog-type", "hive");
    properties.put("uri", "thrift://localhost:9083");
    properties.put("clients", "5");
    properties.put("property-version", "1");
    properties.put("warehouse", "file:///Users/openinx/test/iceberg-warehouse");

    CatalogLoader loader = CatalogLoader.hive("hive_catalog", new Configuration(), properties);
    Table table = loader.loadCatalog().loadTable(TableIdentifier.of("mysql_db", "iceberg_sbtest1"));

    TableOperations ops = ((BaseTable) table).operations();
    TableMetadata meta = ops.current();
    ops.commit(meta, meta.upgradeToFormatVersion(2));
  }
}

Then we could use the sysbench to update the records in mysql server:

sysbench --mysql-host=localhost \
    --mysql-user=<your-mysql-user> \
    --mysql-password=<your-mysql-password>  \
    --mysql-db=test \
    --threads=1 \
    --rate=52 \
    --time=1800 \
    --report-interval=10  \      
    oltp_update_index \
    --table_size=60000000 \
    --skip_trx=on \
    run

@zhangwcoder
Copy link

@openinx Hi bro, I'm paying attention to this function and when will it be completed.
Is it will released in the version of 0.12.0 ? This is my Issues #2409 , I'm focus in syncing data from mysql to iceberg table in flink stream job .

@edgarRd
Copy link
Contributor

edgarRd commented Apr 2, 2021

@openinx Would it be possible to set the base branch to jackye1995:row-id-api for now to see the diff that applies to your branch only? Thanks!

@openinx
Copy link
Member Author

openinx commented Apr 6, 2021

@edgarRd You could see the diff here.

@zhangwcoder From my understanding since we community's last sync, I think we will still not expose v2 to end users but I think there will be some great features that will be merged in next weeks such as rewrite delete files etc.

@openinx openinx marked this pull request as ready for review May 6, 2021 08:21
@dixingxing0
Copy link
Contributor

Is anyone reviewing this PR, it seems that once this PR is merged it will be the first available version for the CDC stream writing, which will be of great help to iceberg users who want to use the upsert feature, personally i think this PR should be merged before 0.12 release. @rdblue @stevenzwu @openinx

@jackye1995
Copy link
Contributor

@dixingxing0 yeah I think this should definitely be a part of 0.12.0 milestone. It's late night at my timezone, I am planning to review tomorrow.

@dixingxing0
Copy link
Contributor

@dixingxing0 yeah I think this should definitely be a part of 0.12.0 milestone. It's late night at my timezone, I am planning to review tomorrow.

Thanks @jackye1995, glad to hear that!

}

public static StructLikeSet actualRowSet(Table table, String... columns) throws IOException {
table.refresh();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method below already calls table.refresh(). seems redundant here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This table.refresh() here is to get the latest snapshot id, so that the follow actualRowSet could read the records from the latest snapshot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. I just found it a little weird that we have to call table.refresh twice in this case. I think your intention is to support actualRowSet for either latest snapshot or specific snapshotId. maybe define the overloaded method as below. Then this method can just pass in null for the snapshotId

actualRowSet(Table table, @Nullable Long snapshotId, String... columns) 

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I like you idea !

if (schema.getPrimaryKey().isPresent()) {
for (String column : schema.getPrimaryKey().get().getColumns()) {
Types.NestedField field = iSchema.findField(column);
Preconditions.checkNotNull(field, "Column %s does not found in schema %s", column, iSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: error message format should follow Cannot ..., such as Cannot find field ID for primary key column %s in schema %s

return freshIdentifierFieldIds(iSchema, schema);
}

private static Schema freshIdentifierFieldIds(Schema iSchema, TableSchema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of having this method, if we expose a TypeUtil.freshIdentifierFieldIds(Schema iSchema, Schema base), can we do a conversion of Flink to Iceberg schema and then call that method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can reuse the TypeUtil.refreshIdentifierFields(Types.StructType freshSchema, Schema baseSchema) because in this method we've had an existing identifier field id list inside the baseSchema. While for this case, the identifier fields are actually came from flink's TableSchema, converting the flink's TableSchema with primary keys to the iceberg table schema with identifier field id list is exactly the thing we are trying to accomplish in this method.

}

@Test
public void testPureInsertOnIdKey() throws Exception {
Copy link
Contributor

@jackye1995 jackye1995 May 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need an insert only test? The tests above should be able to convert this case. If we really want to test comprehensively, I think we should also have tests for insert + delete only and insert + update (before only, after only, before + after) only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I was trying to cover the case that only write inserts in the format v2 (without appending any deletes - pos-deletes or eq-deletes into the posDeleteWriter and eqDeleteWriter). The process of writing pure inserts in v2 (with identifier field id list) is actually different with the process in v1 or the process without identifier field id list in v2 because we need to maintain the insertedRowMap to deduplicate the the same keys in the same txn.

}

@Test
public void testCreateTableWithPrimaryKey() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there an update primary key syntax in flink?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We apache flink don't support altering primary key now, but we have proposal to extend the flink's alter table. Maybe we will support it in the following releases.

}

private static final Map<String, RowKind> ROW_KIND_MAP = ImmutableMap.of(
"+I", RowKind.INSERT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of having these strings mapping with a single method row, which results in a ton of static strings in tests, I think we can use method based approach and have 4 methods insertRow, deleteRow, updateRowBefore, updateRowAfter because there are only 4 cases and we don't really expect the case to grow.

The methods can use signature like insertRow(Object... values) just like the Row.ofKind method to be more flexible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me !

import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestChangeLogTable extends ChangeLogTableTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems there could be many combinations of changelog sequence. how do we know that we covered the meaningful combinations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we cannot cover all the combinations in this unit test, but I think we could cover those critical cases in this class:

  1. The first dimension is the primary key columns (PK), we choose to cover the three cases:
    a) id as PK ;
    b) data as PK;
    c) id,data as PK;
    The different primary key from different combinations will lead to different path to generate the key of a row. We need to ensure that the data correctness is not affected by the key combination.

  2. The second dimension is: What's the correct results after applying INSERT, DELETE, UPDATE inside a same transaction. The same key may have different version of INSERT, DELETE, UPDATE. We need to ensure that the various insert/delete/update combinations within a given txn will not affect the correctness.

  3. The third dimension is: What's the correct results after applying INSERT/DELETE/UPDATE among different transaction. We need to ensure that the various insert/delete/update combinations between different txn will not affect the correctness.

This's the reason that why do we choose to design the following test cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx. might be helpful to add those in comments to give readers some context on the unit test design

return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) {
SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question for the BoundedTestSource line 66. Why is it +2?

checkpointToAwait = numCheckpointsComplete.get() + 2;

Copy link
Member Author

@openinx openinx May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say checkpointToAwait = numCheckpointsComplete.get() + delta, in fact the value of delta should not affect the final table records because we only need to make sure that there will be exactly elementsPerCheckpoint.size() checkpoints to emit each records buffer from the original elementsPerCheckpoint. Even if the checkpoints that emitted results are not continuous, the correctness of the data should not be affected in the end. Setting the delta to be 2 is introducing the variable that produce un-continuous checkpoints that emit the records buffer from elementsPerCheckpoints.

Copy link
Contributor

@stevenzwu stevenzwu May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we want the pattern of checkpoint cycles like this? Only small downside is slightly longer runtime. Now we need to wait for 6 checkpoints instead of 3.

insert batch 1, idle, insert batch 2, idle, insert batch 3, idle, ...

Regardless, it is helpful to add some comments to explain the decision. It is very difficult to know the intention of the magic number of 2 otherwise.


@Test
public void testConvertFlinkSchemaWithPrimaryKeys() {
Schema iSchema = new Schema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: iSchema -> icebergSchema? It took me a few seconds to guess what i stands for here :)

return Row.ofKind(RowKind.UPDATE_AFTER, values);
}

protected static <T> List<T> listJoin(List<List<T>> lists) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this a tiny bit simpler?

return lists.stream()
        .flatMap(List::stream)
        .collect(Collectors.toList());

@openinx
Copy link
Member Author

openinx commented May 18, 2021

@stevenzwu @jackye1995 , any other concerns ?

Copy link
Contributor

@jackye1995 jackye1995 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me!

@openinx openinx merged commit a0d6cdf into apache:master May 18, 2021
@openinx
Copy link
Member Author

openinx commented May 18, 2021

We got one +1 from apache flink expert (@stevenzwu ) and +1 from iceberg community (@jackye1995 ) , and there's no other available iceberg committers with good flink+iceberg background now, so I've got this PR merged in case of blocking this feature too long. If anyone has other concern, we could discuss in the next PR. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants