Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/actions/setup-environment-action/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ runs:

- name: Install Java
if: ${{ inputs.java-version != '' }}
uses: actions/setup-java@v3
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: ${{ inputs.java-version == 'default' && '11' || inputs.java-version }}
java-version: ${{ inputs.java-version == 'default' && '17' || inputs.java-version }}
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
with:
Expand Down
21 changes: 19 additions & 2 deletions .github/workflows/beam_PreCommit_Java_Debezium_IO_Direct.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ on:
schedule:
- cron: '15 1/6 * * *'
workflow_dispatch:

# TODO: .github/trigger_files/beam_PreCommit_Java_Debezium_IO_Direct.json seems not to exist
#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
Expand Down Expand Up @@ -83,15 +83,30 @@ jobs:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
- name: Setup environment (Sets up default Java)
uses: ./.github/actions/setup-environment-action
with:
distribution: 'temurin'
java-version: 17
- name: Set up JDK 17 (ensures it's available and its path can be captured)
id: setup_jdk_17 # Crucial: give it an ID
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '17'
- name: Debug JDK 17 Path
run: |
echo "Installed JDK 17 path for ORG_GRADLE_PROJECT_java17Home is: ${{ steps.setup_jdk_17.outputs.path }}"
echo "Also, JAVA_HOME_17_X64 is: $JAVA_HOME_17_X64" # Check if this is also set by setup-java or the environment
- name: run Debezium IO build task
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:io:debezium:build
arguments: |
-PdisableSpotlessCheck=true \
-PdisableCheckStyle=true \
-Pjava17Home=$JAVA_HOME_17_X64 \
-PtestJavaVersion=17 \
- name: run Debezium IO additional tasks
uses: ./.github/actions/gradle-command-self-hosted-action
with:
Expand All @@ -101,6 +116,8 @@ jobs:
arguments: |
-PdisableSpotlessCheck=true \
-PdisableCheckStyle=true \
-Pjava17Home=$JAVA_HOME_17_X64 \
-PtestJavaVersion=17 \
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources
sdks/java/maven-archetypes/gcp-bom-examples/src/main/resources/archetype-resources/src/
sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/sample.txt

# Ignore generated debezium data
sdks/java/io/debezium/data/

# Ignore files generated by the Python build process.
**/*.pyc
**/*.pyo
Expand Down
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Debezium IO (Java) has been upgraded from depending on version 1.3.1.Final of io.debezium to 3.1.1.Final. This may cause some breaking changes since the libraries do not maintain full compatibility ([#33526](https://github.com/apache/beam/issues/33526)).

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,8 @@ class BeamModulePlugin implements Plugin<Project> {
activemq_junit : "org.apache.activemq.tooling:activemq-junit:$activemq_version",
activemq_kahadb_store : "org.apache.activemq:activemq-kahadb-store:$activemq_version",
activemq_mqtt : "org.apache.activemq:activemq-mqtt:$activemq_version",
antlr : "org.antlr:antlr4:4.7",
antlr_runtime : "org.antlr:antlr4-runtime:4.7",
antlr : "org.antlr:antlr4:4.10",
antlr_runtime : "org.antlr:antlr4-runtime:4.10",
args4j : "args4j:args4j:2.33",
auto_value_annotations : "com.google.auto.value:auto-value-annotations:$autovalue_version",
avro : "org.apache.avro:avro:1.11.4",
Expand Down
111 changes: 97 additions & 14 deletions sdks/java/io/debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,100 @@ provideIntegrationTestingDependencies()
description = "Apache Beam :: SDKs :: Java :: IO :: Debezium"
ext.summary = "Library to work with Debezium data."

// Set Java 17 compatibility for this module
java {

sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}
// Configure the compileJava and compileTestJava tasks directly
tasks.withType(JavaCompile).configureEach { task ->
// Only configure for the current project's main and test compile tasks
if (task.project == project && (task.name == 'compileJava' || task.name == 'compileTestJava')) {
logger.lifecycle("[BUILD_GRADLE] Debezium IO: Attempting to configure ${task.path} for Java 17.")

// Read the JDK 17 home path from the environment variable
String jdk17HomePath = System.getenv("JAVA_HOME_17_X64")

if (jdk17HomePath != null && !jdk17HomePath.trim().isEmpty() && project.file(jdk17HomePath.trim()).isDirectory()) {
logger.lifecycle("[BUILD_GRADLE] Debezium IO: Using JAVA_HOME_17_X64='${jdk17HomePath}' for ${task.path}.")
task.options.fork = true
task.options.forkOptions.javaHome = project.file(jdk17HomePath.trim())

// Explicitly set compiler arguments for Java 17 mode.
// This overrides any defaults (like --release 11) potentially set by BeamModulePlugin.
def newCompilerArgs = task.options.compilerArgs.findAll { arg ->
!arg.startsWith('-source') && !arg.startsWith('-target') && !arg.startsWith('--release')
}
newCompilerArgs.addAll(['--release', '17'])
task.options.compilerArgs = newCompilerArgs
logger.lifecycle("[BUILD_GRADLE] Debezium IO: ${task.path} compilerArgs set to: ${task.options.compilerArgs.join(' ')}")

// Handle ErrorProne specific arguments for Java 17 if errorprone is enabled for this task
if (task.extensions.findByName("errorprone") != null && project.rootProject.ext.has('errorProneAddModuleOpts')) {
def errorProneAddModuleOpts = project.rootProject.ext.errorProneAddModuleOpts
task.options.forkOptions.jvmArgs += errorProneAddModuleOpts.collect { '-J' + it }
// Potentially: task.options.errorprone.errorproneArgs.add("-XepDisableAllChecks")
logger.lifecycle("[BUILD_GRADLE] Debezium IO: ${task.path} - Added ErrorProne JVM args for forked Java 17 compiler.")
}
} else {
logger.warn("[BUILD_GRADLE] Debezium IO: Environment variable JAVA_HOME_17_X64 not found, empty, or not a valid directory. Value: '${jdk17HomePath ?: 'null'}'. ${task.path} will use default JDK. Java 17 is required for Debezium IO.")
// If JAVA_HOME_17_X64 isn't found, we still try to make the default compiler (if it's 17) behave correctly.
// Or if default is 11, this will likely lead to bad class file version if it finds Debezium JARs.
def newCompilerArgs = task.options.compilerArgs.findAll { arg ->
!arg.startsWith('-source') && !arg.startsWith('-target') && !arg.startsWith('--release')
}
newCompilerArgs.addAll(['--release', '17']) // Attempt to set for default JDK too
task.options.compilerArgs = newCompilerArgs
}
}
}
// This block ensures that if 'java17Home' is passed as a Gradle property,
// the 'compileJava' task for this module will use it.
// project.afterEvaluate { // Use afterEvaluate to ensure 'compileJava' task exists and is configured by BeamModulePlugin
// if (project.hasProperty("java17Home") && project.property("java17Home").toString().trim() != "") {
// tasks.named('compileJava', JavaCompile) {
// logger.lifecycle("Debezium IO: Configuring 'compileJava' to fork and use Java 17 from java17Home: ${project.property("java17Home")}")
// options.fork = true // Essential for using a different javaHome
// options.forkOptions.javaHome = project.file(project.property("java17Home"))

// // Ensure compiler arguments are set for Java 17, overriding defaults.
// // BeamModulePlugin might set '--release 8' or '--release 11' by default.
// def newCompilerArgs = options.compilerArgs.findAll { arg ->
// !arg.startsWith('-source') && !arg.startsWith('-target') && !arg.startsWith('--release')
// }
// newCompilerArgs.addAll(['--release', '17']) // Use --release for better JDK compatibility.
// options.compilerArgs = newCompilerArgs

// // Handle ErrorProne arguments if necessary for Java 17 compilation,
// // similar to how setJavaVerOptions does it in BeamModulePlugin.groovy
// if (project.rootProject.ext.has('errorProneAddModuleOpts')) {
// def errorProneAddModuleOpts = project.rootProject.ext.errorProneAddModuleOpts
// // The -J prefix is needed to workaround https://github.com/gradle/gradle/issues/22747 when forking
// options.forkOptions.jvmArgs += errorProneAddModuleOpts.collect { '-J' + it }
// // May also need: options.errorprone.errorproneArgs.add("-XepDisableAllChecks")
// // if error prone is active and causes issues with Java 17. Check BeamModulePlugin for exact args.
// }
// }
// } else {
// logger.lifecycle("Debezium IO: 'java17Home' property not found or empty. Relying on Gradle toolchain detection for Java 17 for 'compileJava'.")
// }

// // If you also want compileTestJava to use this (good for consistency):
// if (project.hasProperty("testJavaVersion") && project.property("testJavaVersion") == "17" &&
// project.hasProperty("java17Home") && project.property("java17Home").toString().trim() != "") {
// tasks.named('compileTestJava', JavaCompile) {
// logger.lifecycle("Debezium IO: Configuring 'compileTestJava' to use Java 17 from java17Home via testJavaVersion=17 property.")
// // The BeamModulePlugin's logic for testJavaVersion should automatically
// // call setJavaVerOptions, which will use java17Home.
// // You typically don't need to re-configure it here if -PtestJavaVersion=17 is passed.
// // Just ensure source/target are aligned if not already.
// sourceCompatibility = JavaVersion.VERSION_17
// targetCompatibility = JavaVersion.VERSION_17
// }
// }
// }

dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.vendored_grpc_1_69_0
Expand Down Expand Up @@ -60,9 +154,9 @@ dependencies {
permitUnusedDeclared "org.apache.kafka:connect-json:2.5.0" // BEAM-11761

// Debezium dependencies
implementation group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final'
testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.3.1.Final'
testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '1.3.1.Final'
implementation group: 'io.debezium', name: 'debezium-core', version: '3.1.1.Final'
testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '3.1.1.Final'
testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '3.1.1.Final'
}

test {
Expand All @@ -89,14 +183,3 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
useJUnit {
}
}

configurations.all (Configuration it) -> {
resolutionStrategy {
// Force protobuf 3 because debezium is currently incompatible with protobuf 4.
// TODO - remove this and upgrade the version of debezium once a proto-4 compatible version is available
// https://github.com/apache/beam/pull/33526 does some of this, but was abandoned because it still doesn't
// work with protobuf 4.
force "com.google.protobuf:protobuf-java:3.25.5"
force "com.google.protobuf:protobuf-java-util:3.25.5"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,11 @@ public Map<String, String> getConfigurationMap() {
configuration.computeIfAbsent(entry.getKey(), k -> entry.getValue());
}

// Set default Database History impl. if not provided
// Set default Database History implementation and Kafka topic prefix, if not provided
configuration.computeIfAbsent(
"database.history",
k -> KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName());
configuration.computeIfAbsent("topic.prefix", k -> "beam-debezium-connector");

String stringProperties = Joiner.on('\n').withKeyValueSeparator(" -> ").join(configuration);
LOG.debug("---------------- Connector configuration: {}", stringProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
connectorConfiguration
.withConnectionProperty("table.include.list", configuration.getTable())
.withConnectionProperty("include.schema.changes", "false")
.withConnectionProperty("database.server.name", "beam-pipeline-server");
// add random unique name/identifier for server to identify connector
.withConnectionProperty("database.server.name", "beam-pipeline-server")
.withConnectionProperty("database.server.id", "579676")
.withConnectionProperty(
"schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory")
.withConnectionProperty(
"schema.history.internal.file.filename", "data/schema_history.dat");
if (configuration.getDatabase().equals("POSTGRES")) {
LOG.info(
"As Database is POSTGRES, we set the `database.dbname` property to {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.SchemaHistoryException;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -474,7 +474,7 @@ public IsBounded isBounded() {
}
}

public static class DebeziumSDFDatabaseHistory extends AbstractDatabaseHistory {
public static class DebeziumSDFDatabaseHistory extends AbstractSchemaHistory {
private List<byte[]> history;

public DebeziumSDFDatabaseHistory() {
Expand All @@ -497,7 +497,7 @@ public void start() {
}

@Override
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
LOG.debug("------------- Adding history! {}", record);

history.add(DocumentWriter.defaultWriter().writeAsBytes(record.document()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public void testDebeziumIOMySql() {
.withMaxNumberOfRecords(30)
.withCoder(StringUtf8Coder.of()));
String expected =
"{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\","
"{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.9.8.Final\",\"name\":\"dbserver1\","
+ "\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null,"
+ "\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\","
+ "\"street\":\"3183 Moore Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;

import io.debezium.DebeziumException;
import java.time.Duration;
import java.util.Arrays;
import java.util.stream.Collectors;
Expand All @@ -28,7 +29,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.kafka.connect.errors.ConnectException;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -124,15 +124,16 @@ public void testNoProblem() {
result.getSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toList()),
Matchers.containsInAnyOrder("before", "after", "source", "op", "ts_ms", "transaction"));
Matchers.containsInAnyOrder(
"before", "after", "source", "op", "ts_ms", "ts_us", "ts_ns", "transaction"));
}

@Test
public void testWrongUser() {
Pipeline readPipeline = Pipeline.create();
ConnectException ex =
DebeziumException ex =
assertThrows(
ConnectException.class,
DebeziumException.class,
() -> {
PCollectionRowTuple.empty(readPipeline)
.apply(
Expand All @@ -151,9 +152,9 @@ public void testWrongUser() {
@Test
public void testWrongPassword() {
Pipeline readPipeline = Pipeline.create();
ConnectException ex =
DebeziumException ex =
assertThrows(
ConnectException.class,
DebeziumException.class,
() -> {
PCollectionRowTuple.empty(readPipeline)
.apply(
Expand All @@ -172,9 +173,9 @@ public void testWrongPassword() {
@Test
public void testWrongPort() {
Pipeline readPipeline = Pipeline.create();
ConnectException ex =
DebeziumException ex =
assertThrows(
ConnectException.class,
DebeziumException.class,
() -> {
PCollectionRowTuple.empty(readPipeline)
.apply(makePtransform(userName, password, database, 12345, "localhost"))
Expand Down
Loading