Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,12 @@ public Row build() {
throw new IllegalArgumentException(
"Row expected "
+ schema.getFieldCount()
+ " fields. initialized with "
+ String.format(
" fields (%s).",
schema.getFields().stream()
.map(Object::toString)
.collect(Collectors.joining(", ")))
+ " initialized with "
+ values.size()
+ " fields.");
}
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {

// Test dependencies
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:jdbc")
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(":runners:google-cloud-dataflow-java")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.beam.io.debezium;

import org.apache.kafka.connect.source.SourceConnector;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.checkerframework.checker.nullness.qual.NonNull;

/** Enumeration of debezium connectors. */
public enum Connectors {
Expand All @@ -30,7 +28,6 @@ public enum Connectors {
ORACLE("Oracle", "io.debezium.connector.oracle.OracleConnector"),
DB2("DB2", "io.debezium.connector.db2.Db2Connector"),
;
private static final Logger LOG = LoggerFactory.getLogger(Connectors.class);
private final String name;
private final String connector;

Expand All @@ -45,12 +42,14 @@ public String getName() {
}

/** Class connector to debezium. */
public @Nullable Class<? extends SourceConnector> getConnector() {
public @NonNull Class<? extends SourceConnector> getConnector() {
Class<? extends SourceConnector> connectorClass = null;
try {
connectorClass = (Class<? extends SourceConnector>) Class.forName(this.connector);
} catch (ClassCastException | ClassNotFoundException e) {
LOG.error("Connector class is not found", e);
throw new IllegalArgumentException(
String.format(
"Unable to resolve class %s to use as Debezium connector.", this.connector));
}
return connectorClass;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,24 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -140,6 +144,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract @Nullable Integer getMaxNumberOfRecords();

abstract @Nullable Long getMaxTimeToRun();

abstract @Nullable Coder<T> getCoder();

abstract Builder<T> toBuilder();
Expand All @@ -154,6 +160,8 @@ abstract static class Builder<T> {

abstract Builder<T> setMaxNumberOfRecords(Integer maxNumberOfRecords);

abstract Builder<T> setMaxTimeToRun(Long miliseconds);

abstract Read<T> build();
}

Expand Down Expand Up @@ -202,6 +210,53 @@ public Read<T> withMaxNumberOfRecords(Integer maxNumberOfRecords) {
return toBuilder().setMaxNumberOfRecords(maxNumberOfRecords).build();
}

/**
* Once the connector has run for the determined amount of time, it will stop. The value can be
* null (default) which means it will not stop. This parameter is mainly intended for testing.
*
* @param miliseconds The maximum number of miliseconds to run before stopping the connector.
* @return PTransform {@link #read}
*/
public Read<T> withMaxTimeToRun(Long miliseconds) {
return toBuilder().setMaxTimeToRun(miliseconds).build();
}

protected Schema getRecordSchema() {
KafkaSourceConsumerFn<T> fn =
new KafkaSourceConsumerFn<>(
getConnectorConfiguration().getConnectorClass().get(),
getFormatFunction(),
getMaxNumberOfRecords());
fn.register(
new KafkaSourceConsumerFn.OffsetTracker(
new KafkaSourceConsumerFn.OffsetHolder(null, null, 0)));

Map<String, String> connectorConfig =
Maps.newHashMap(getConnectorConfiguration().getConfigurationMap());
connectorConfig.put("snapshot.mode", "schema_only");
SourceRecord sampledRecord =
fn.getOneRecord(getConnectorConfiguration().getConfigurationMap());
fn.reset();
Schema keySchema =
sampledRecord.keySchema() != null
? KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(sampledRecord.keySchema())
: Schema.builder().build();
Schema valueSchema =
KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(sampledRecord.valueSchema());

return Schema.builder()
.addFields(valueSchema.getFields())
.setOptions(
Schema.Options.builder()
.setOption(
"primaryKeyColumns",
Schema.FieldType.array(Schema.FieldType.STRING),
keySchema.getFields().stream()
.map(Schema.Field::getName)
.collect(Collectors.toList())))
.build();
}

@Override
public PCollection<T> expand(PBegin input) {
return input
Expand All @@ -213,7 +268,8 @@ public PCollection<T> expand(PBegin input) {
new KafkaSourceConsumerFn<>(
getConnectorConfiguration().getConnectorClass().get(),
getFormatFunction(),
getMaxNumberOfRecords())))
getMaxNumberOfRecords(),
getMaxTimeToRun())))
.setCoder(getCoder());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.debezium;

import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A schema-aware transform provider for {@link DebeziumIO}. This class provides a {@link
* PTransform} that returns a change stream for a relational database.
*
* <p>The transform needs to access the source database <b>on expansion</b> and at <b>pipeline
* runtime</b>. At expansion, the output {@link org.apache.beam.sdk.values.PCollection} schema is
* retrieved, while at runtime, the change stream is consumed.
*
* <p>This transform is tested against <b>MySQL and Postgres</b>, but it should work well for any
* data source supported by Debezium.
*/
public class DebeziumReadSchemaTransformProvider
extends TypedSchemaTransformProvider<
DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration> {

private static final Logger LOG =
LoggerFactory.getLogger(DebeziumReadSchemaTransformProvider.class);
private final Boolean isTest;
private final Integer testLimitRecords;
private final Long testLimitMilliseconds;

DebeziumReadSchemaTransformProvider() {
this(false, -1, Long.MAX_VALUE);
}

@VisibleForTesting
protected DebeziumReadSchemaTransformProvider(
Boolean isTest, Integer recordLimit, Long timeLimitMs) {
this.isTest = isTest;
this.testLimitRecords = recordLimit;
this.testLimitMilliseconds = timeLimitMs;
}

@Override
protected @NonNull @Initialized Class<DebeziumReadSchemaTransformConfiguration>
configurationClass() {
return DebeziumReadSchemaTransformConfiguration.class;
}

@Override
protected @NonNull @Initialized SchemaTransform from(
DebeziumReadSchemaTransformConfiguration configuration) {
// TODO(pabloem): Validate configuration parameters to ensure formatting is correct.
return new SchemaTransform() {
@Override
public @UnknownKeyFor @NonNull @Initialized PTransform<
@UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
@UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
buildTransform() {
return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// TODO(pabloem): Test this behavior
Collection<String> connectors =
Arrays.stream(Connectors.values())
.map(Object::toString)
.collect(Collectors.toSet());
if (!connectors.contains(configuration.getDatabase())) {
throw new IllegalArgumentException(
"Unsupported database "
+ configuration.getDatabase()
+ ". Unable to select a JDBC driver for it. Supported Databases are: "
+ String.join(", ", connectors));
}
Class<?> connectorClass =
Objects.requireNonNull(Connectors.valueOf(configuration.getDatabase()))
.getConnector();
DebeziumIO.ConnectorConfiguration connectorConfiguration =
DebeziumIO.ConnectorConfiguration.create()
.withUsername(configuration.getUsername())
.withPassword(configuration.getPassword())
.withHostName(configuration.getHost())
.withPort(Integer.toString(configuration.getPort()))
.withConnectorClass(connectorClass);
connectorConfiguration =
connectorConfiguration
.withConnectionProperty("table.include.list", configuration.getTable())
.withConnectionProperty("include.schema.changes", "false")
.withConnectionProperty("database.server.name", "beam-pipeline-server");
if (configuration.getDatabase().equals("POSTGRES")) {
LOG.info(
"As Database is POSTGRES, we set the `database.dbname` property to {}.",
configuration.getTable().substring(0, configuration.getTable().indexOf(".")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.indexOf(".") will return -1 if there's no . in the table configuration field and the substring will throw an error.

If having the dot is a must, there should be a validation check in DebeziumReadSchemaTransformConfiguration for this.

connectorConfiguration =
connectorConfiguration.withConnectionProperty(
"database.dbname",
configuration.getTable().substring(0, configuration.getTable().indexOf(".")));
}

final List<String> debeziumConnectionProperties =
configuration.getDebeziumConnectionProperties();
if (debeziumConnectionProperties != null) {
for (String connectionProperty : debeziumConnectionProperties) {
String[] parts = connectionProperty.split("=", -1);
String key = parts[0];
String value = parts[1];
Comment on lines +135 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above, I think some validation may be needed for debeziumConnectionProperties. A customized error message would help when users input connection properties that don't follow the "abc=xyz" standard.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a todo. Thanks Ahmed!

connectorConfiguration.withConnectionProperty(key, value);
}
}

DebeziumIO.Read<Row> readTransform =
DebeziumIO.<Row>read().withConnectorConfiguration(connectorConfiguration);

if (isTest) {
readTransform =
readTransform
.withMaxNumberOfRecords(testLimitRecords)
.withMaxTimeToRun(testLimitMilliseconds);
}

// TODO(pabloem): Database connection issues can be debugged here.
Schema recordSchema = readTransform.getRecordSchema();
LOG.info(
"Computed schema for table {} from {}: {}",
configuration.getTable(),
configuration.getDatabase(),
recordSchema);
SourceRecordMapper<Row> formatFn =
KafkaConnectUtils.beamRowFromSourceRecordFn(recordSchema);
readTransform =
readTransform.withFormatFunction(formatFn).withCoder(RowCoder.of(recordSchema));

return PCollectionRowTuple.of("output", input.getPipeline().apply(readTransform));
}
};
}
};
}

@Override
public @NonNull @Initialized String identifier() {
return "beam:schematransform:org.apache.beam:debezium_read:v1";
}

@Override
public @NonNull @Initialized List<@NonNull @Initialized String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public @NonNull @Initialized List<@NonNull @Initialized String> outputCollectionNames() {
return Collections.singletonList("output");
}

@AutoValue
public abstract static class DebeziumReadSchemaTransformConfiguration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned two things above that can be considered for validation, maybe there are others. This would make this SchemaTransform more user-friendly but I think these are not immediate blockers (maybe just add a TODO).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!I added a todo : )

public abstract String getUsername();

public abstract String getPassword();

public abstract String getHost();

public abstract Integer getPort();

public abstract String getTable();

public abstract @NonNull String getDatabase();

public abstract @Nullable List<String> getDebeziumConnectionProperties();

public static Builder builder() {
return new AutoValue_DebeziumReadSchemaTransformProvider_DebeziumReadSchemaTransformConfiguration
.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setUsername(String username);

public abstract Builder setPassword(String password);

public abstract Builder setHost(String host);

public abstract Builder setPort(Integer port);

public abstract Builder setDatabase(String database);

public abstract Builder setTable(String table);

public abstract Builder setDebeziumConnectionProperties(
List<String> debeziumConnectionProperties);

public abstract DebeziumReadSchemaTransformConfiguration build();
}
}
}
Loading