Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class FlinkDatabaseHistory extends AbstractDatabaseHistory {

private ConcurrentLinkedQueue<HistoryRecord> records;
private String instanceName;
private boolean databaseexists;

/**
* Registers the given HistoryRecords into global variable under the given instance name,
Expand Down Expand Up @@ -104,6 +105,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
throw new IllegalStateException(
String.format("Couldn't find engine instance %s in the global records.", instanceName));
}
this.databaseexists = config.getBoolean("database.history.exists", true);
}

@Override
Expand All @@ -129,7 +131,7 @@ protected void recoverRecords(Consumer<HistoryRecord> records) {

@Override
public boolean exists() {
return true;
return databaseexists;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@

package com.alibaba.ververica.cdc.connectors.mysql;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumState;
import com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
import io.debezium.connector.mysql.MySqlConnector;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -49,6 +56,8 @@ public static class Builder<T> {
private String serverTimeZone;
private String[] tableList;
private Properties dbzProperties;
private String sourceOffsetFile;
private Integer sourceOffsetPosition;
private DebeziumDeserializationSchema<T> deserializer;

public Builder<T> hostname(String hostname) {
Expand Down Expand Up @@ -138,6 +147,22 @@ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
return this;
}

/**
* Sets the MySql source offset file name.
*/
public Builder<T> sourceOffsetFile(String sourceOffsetFile) {
this.sourceOffsetFile = sourceOffsetFile;
return this;
}

/**
* Sets the MySql source offset position.
*/
public Builder<T> sourceOffsetPosition(Integer sourceOffsetPosition) {
this.sourceOffsetPosition = sourceOffsetPosition;
return this;
}

public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
Expand Down Expand Up @@ -165,6 +190,30 @@ public DebeziumSourceFunction<T> build() {
if (serverTimeZone != null) {
props.setProperty("database.serverTimezone", serverTimeZone);
}
if (sourceOffsetFile != null && sourceOffsetPosition != null) {
// if binlog offset is specified, 'snapshot.mode=schema_only_recovery' must be configured
props.setProperty("snapshot.mode", "schema_only_recovery");

DebeziumState debeziumState = new DebeziumState();
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("server", props.getProperty("database.server.name"));
debeziumState.setSourcePartition(sourcePartition);

Map<String, Object> sourceOffset = new HashMap<>();
sourceOffset.put("file", sourceOffsetFile);
sourceOffset.put("pos", sourceOffsetPosition);
debeziumState.setSourceOffset(sourceOffset);

try {
ObjectMapper objectMapper = new ObjectMapper();
String offsetJson = objectMapper.writeValueAsString(debeziumState);
// if the task is restored from savepoint, it will be overwritten by restoredOffsetState
props.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, offsetJson);
props.setProperty("database.history.exists", "false");
} catch (IOException e) {
throw new RuntimeException("Can't serialize debezium offset state from Object: " + debeziumState, e);
}
}

if (dbzProperties != null) {
dbzProperties.forEach(props::put);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.ververica.cdc.connectors.mysql.options;

import javax.annotation.Nullable;

import java.util.Objects;

/**
* Offset option for MySql.
*/
public class MySQLOffsetOptions {

@Nullable
private final String sourceOffsetFile;
@Nullable
private final Integer sourceOffsetPosition;

private MySQLOffsetOptions(@Nullable String sourceOffsetFile, @Nullable Integer sourceOffsetPosition) {
this.sourceOffsetFile = sourceOffsetFile;
this.sourceOffsetPosition = sourceOffsetPosition;
}

@Nullable
public String getSourceOffsetFile() {
return sourceOffsetFile;
}

@Nullable
public Integer getSourceOffsetPosition() {
return sourceOffsetPosition;
}

@Override
public String toString() {
return "MySQLOffsetOptions{" +
"sourceOffsetFile='" + sourceOffsetFile + '\'' +
", sourceOffsetPosition='" + sourceOffsetPosition + '\'' +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MySQLOffsetOptions that = (MySQLOffsetOptions) o;
return Objects.equals(sourceOffsetFile, that.sourceOffsetFile) &&
Objects.equals(sourceOffsetPosition, that.sourceOffsetPosition);
}

@Override
public int hashCode() {
return Objects.hash(sourceOffsetFile, sourceOffsetPosition);
}

/**
* Creates a builder of {@link MySQLOffsetOptions}.
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link MySQLOffsetOptions}.
*/
public static class Builder {

private String sourceOffsetFile;
private Integer sourceOffsetPosition;

/**
* Sets the MySql source offset file name.
*/
public Builder sourceOffsetFile(String sourceOffsetFile) {
this.sourceOffsetFile = sourceOffsetFile;
return this;
}

/**
* Sets the MySql source offset position.
*/
public Builder sourceOffsetPosition(Integer sourceOffsetPosition) {
this.sourceOffsetPosition = sourceOffsetPosition;
return this;
}

/**
* Creates an instance of {@link MySQLOffsetOptions}.
*/
public MySQLOffsetOptions build() {
return new MySQLOffsetOptions(sourceOffsetFile, sourceOffsetPosition);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.types.RowKind;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.options.MySQLOffsetOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class MySQLTableSource implements ScanTableSource {
private final String tableName;
private final ZoneId serverTimeZone;
private final Properties dbzProperties;
private final MySQLOffsetOptions offsetOptions;

public MySQLTableSource(
TableSchema physicalSchema,
Expand All @@ -69,7 +71,8 @@ public MySQLTableSource(
String password,
ZoneId serverTimeZone,
Properties dbzProperties,
@Nullable Integer serverId) {
@Nullable Integer serverId,
MySQLOffsetOptions offsetOptions) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
Expand All @@ -80,6 +83,7 @@ public MySQLTableSource(
this.serverId = serverId;
this.serverTimeZone = serverTimeZone;
this.dbzProperties = dbzProperties;
this.offsetOptions = offsetOptions;
}

@Override
Expand Down Expand Up @@ -111,7 +115,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.password(password)
.serverTimeZone(serverTimeZone.toString())
.debeziumProperties(dbzProperties)
.deserializer(deserializer);
.deserializer(deserializer)
.sourceOffsetFile(offsetOptions.getSourceOffsetFile())
.sourceOffsetPosition(offsetOptions.getSourceOffsetPosition());
Optional.ofNullable(serverId).ifPresent(builder::serverId);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();

Expand All @@ -121,16 +127,17 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
@Override
public DynamicTableSource copy() {
return new MySQLTableSource(
physicalSchema,
port,
hostname,
database,
tableName,
username,
password,
serverTimeZone,
dbzProperties,
serverId
physicalSchema,
port,
hostname,
database,
tableName,
username,
password,
serverTimeZone,
dbzProperties,
serverId,
offsetOptions
);
}

Expand All @@ -152,12 +159,13 @@ public boolean equals(Object o) {
Objects.equals(serverId, that.serverId) &&
Objects.equals(tableName, that.tableName) &&
Objects.equals(serverTimeZone, that.serverTimeZone) &&
Objects.equals(dbzProperties, that.dbzProperties);
Objects.equals(dbzProperties, that.dbzProperties) &&
Objects.equals(offsetOptions, that.offsetOptions);
}

@Override
public int hashCode() {
return Objects.hash(physicalSchema, port, hostname, database, username, password, serverId, tableName, serverTimeZone, dbzProperties);
return Objects.hash(physicalSchema, port, hostname, database, username, password, serverId, tableName, serverTimeZone, dbzProperties, offsetOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;

import com.alibaba.ververica.cdc.connectors.mysql.options.MySQLOffsetOptions;
import com.alibaba.ververica.cdc.debezium.table.DebeziumOptions;

import java.time.ZoneId;
Expand Down Expand Up @@ -85,6 +86,16 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory {
"MySQL database cluster as another server (with this unique ID) so it can read the binlog. " +
"By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.");

private static final ConfigOption<String> SOURCE_OFFSET_FILE = ConfigOptions.key("source-offset-file")
.stringType()
.noDefaultValue()
.withDescription("File Name of the MySQL binlog.");

private static final ConfigOption<Integer> SOURCE_OFFSET_POSITION = ConfigOptions.key("source-offset-pos")
.intType()
.noDefaultValue()
.withDescription("Position of the MySQL binlog.");

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
Expand All @@ -100,6 +111,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
Integer serverId = config.getOptional(SERVER_ID).orElse(null);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
MySQLOffsetOptions.Builder builder = MySQLOffsetOptions.builder();
builder.sourceOffsetFile(config.get(SOURCE_OFFSET_FILE))
.sourceOffsetPosition(config.getOptional(SOURCE_OFFSET_POSITION).orElse(null));

return new MySQLTableSource(
physicalSchema,
Expand All @@ -111,7 +125,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
password,
serverTimeZone,
getDebeziumProperties(context.getCatalogTable().getOptions()),
serverId
serverId,
builder.build()
);
}

Expand All @@ -137,6 +152,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PORT);
options.add(SERVER_TIME_ZONE);
options.add(SERVER_ID);
options.add(SOURCE_OFFSET_FILE);
options.add(SOURCE_OFFSET_POSITION);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.ExceptionUtils;

import com.alibaba.ververica.cdc.connectors.mysql.options.MySQLOffsetOptions;
import org.junit.Test;

import java.time.ZoneId;
Expand Down Expand Up @@ -77,7 +78,8 @@ public void testCommonProperties() {
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
null
null,
MySQLOffsetOptions.builder().build()
);
assertEquals(expectedSource, actualSource);
}
Expand All @@ -103,7 +105,8 @@ public void testOptionalProperties() {
MY_PASSWORD,
ZoneId.of("Asia/Shanghai"),
dbzProperties,
4321
4321,
MySQLOffsetOptions.builder().build()
);
assertEquals(expectedSource, actualSource);
}
Expand Down