diff --git a/.github/actions/setup-environment-action/action.yml b/.github/actions/setup-environment-action/action.yml index d5f1f879a072..bb6c6940b4fd 100644 --- a/.github/actions/setup-environment-action/action.yml +++ b/.github/actions/setup-environment-action/action.yml @@ -71,10 +71,10 @@ runs: - name: Install Java if: ${{ inputs.java-version != '' }} - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: distribution: 'temurin' - java-version: ${{ inputs.java-version == 'default' && '11' || inputs.java-version }} + java-version: ${{ inputs.java-version == 'default' && '17' || inputs.java-version }} - name: Setup Gradle uses: gradle/gradle-build-action@v2 with: diff --git a/.github/workflows/beam_PreCommit_Java_Debezium_IO_Direct.yml b/.github/workflows/beam_PreCommit_Java_Debezium_IO_Direct.yml index c4b747cdfb29..4af14e63c009 100644 --- a/.github/workflows/beam_PreCommit_Java_Debezium_IO_Direct.yml +++ b/.github/workflows/beam_PreCommit_Java_Debezium_IO_Direct.yml @@ -33,7 +33,7 @@ on: schedule: - cron: '15 1/6 * * *' workflow_dispatch: - +# TODO: .github/trigger_files/beam_PreCommit_Java_Debezium_IO_Direct.json seems not to exist #Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event permissions: actions: write @@ -83,8 +83,21 @@ jobs: comment_phrase: ${{ matrix.job_phrase }} github_token: ${{ secrets.GITHUB_TOKEN }} github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) - - name: Setup environment + - name: Setup environment (Sets up default Java) uses: ./.github/actions/setup-environment-action + with: + distribution: 'temurin' + java-version: 17 + - name: Set up JDK 17 (ensures it's available and its path can be captured) + id: setup_jdk_17 # Crucial: give it an ID + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '17' + - name: Debug JDK 17 Path + run: | + echo "Installed JDK 17 path for ORG_GRADLE_PROJECT_java17Home is: ${{ steps.setup_jdk_17.outputs.path }}" + echo "Also, JAVA_HOME_17_X64 is: $JAVA_HOME_17_X64" # Check if this is also set by setup-java or the environment - name: run Debezium IO build task uses: ./.github/actions/gradle-command-self-hosted-action with: @@ -92,6 +105,8 @@ jobs: arguments: | -PdisableSpotlessCheck=true \ -PdisableCheckStyle=true \ + -Pjava17Home=$JAVA_HOME_17_X64 \ + -PtestJavaVersion=17 \ - name: run Debezium IO additional tasks uses: ./.github/actions/gradle-command-self-hosted-action with: @@ -101,6 +116,8 @@ jobs: arguments: | -PdisableSpotlessCheck=true \ -PdisableCheckStyle=true \ + -Pjava17Home=$JAVA_HOME_17_X64 \ + -PtestJavaVersion=17 \ - name: Archive JUnit Test Results uses: actions/upload-artifact@v4 if: ${{ !success() }} diff --git a/.gitignore b/.gitignore index 2bad81975ba0..84141bc6d05e 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,9 @@ sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources sdks/java/maven-archetypes/gcp-bom-examples/src/main/resources/archetype-resources/src/ sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/sample.txt +# Ignore generated debezium data +sdks/java/io/debezium/data/ + # Ignore files generated by the Python build process. **/*.pyc **/*.pyo diff --git a/CHANGES.md b/CHANGES.md index 93353245042a..5195f154e634 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -76,7 +76,7 @@ ## Breaking Changes -* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* Debezium IO (Java) has been upgraded from depending on version 1.3.1.Final of io.debezium to 3.1.1.Final. This may cause some breaking changes since the libraries do not maintain full compatibility ([#33526](https://github.com/apache/beam/issues/33526)). ## Deprecations diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 472ece04f9e4..b6ec4f20ef04 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -668,8 +668,8 @@ class BeamModulePlugin implements Plugin { activemq_junit : "org.apache.activemq.tooling:activemq-junit:$activemq_version", activemq_kahadb_store : "org.apache.activemq:activemq-kahadb-store:$activemq_version", activemq_mqtt : "org.apache.activemq:activemq-mqtt:$activemq_version", - antlr : "org.antlr:antlr4:4.7", - antlr_runtime : "org.antlr:antlr4-runtime:4.7", + antlr : "org.antlr:antlr4:4.10", + antlr_runtime : "org.antlr:antlr4-runtime:4.10", args4j : "args4j:args4j:2.33", auto_value_annotations : "com.google.auto.value:auto-value-annotations:$autovalue_version", avro : "org.apache.avro:avro:1.11.4", diff --git a/sdks/java/io/debezium/build.gradle b/sdks/java/io/debezium/build.gradle index 2070bcfc873c..3fb920b8d043 100644 --- a/sdks/java/io/debezium/build.gradle +++ b/sdks/java/io/debezium/build.gradle @@ -30,6 +30,100 @@ provideIntegrationTestingDependencies() description = "Apache Beam :: SDKs :: Java :: IO :: Debezium" ext.summary = "Library to work with Debezium data." +// Set Java 17 compatibility for this module +java { + + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} +// Configure the compileJava and compileTestJava tasks directly +tasks.withType(JavaCompile).configureEach { task -> + // Only configure for the current project's main and test compile tasks + if (task.project == project && (task.name == 'compileJava' || task.name == 'compileTestJava')) { + logger.lifecycle("[BUILD_GRADLE] Debezium IO: Attempting to configure ${task.path} for Java 17.") + + // Read the JDK 17 home path from the environment variable + String jdk17HomePath = System.getenv("JAVA_HOME_17_X64") + + if (jdk17HomePath != null && !jdk17HomePath.trim().isEmpty() && project.file(jdk17HomePath.trim()).isDirectory()) { + logger.lifecycle("[BUILD_GRADLE] Debezium IO: Using JAVA_HOME_17_X64='${jdk17HomePath}' for ${task.path}.") + task.options.fork = true + task.options.forkOptions.javaHome = project.file(jdk17HomePath.trim()) + + // Explicitly set compiler arguments for Java 17 mode. + // This overrides any defaults (like --release 11) potentially set by BeamModulePlugin. + def newCompilerArgs = task.options.compilerArgs.findAll { arg -> + !arg.startsWith('-source') && !arg.startsWith('-target') && !arg.startsWith('--release') + } + newCompilerArgs.addAll(['--release', '17']) + task.options.compilerArgs = newCompilerArgs + logger.lifecycle("[BUILD_GRADLE] Debezium IO: ${task.path} compilerArgs set to: ${task.options.compilerArgs.join(' ')}") + + // Handle ErrorProne specific arguments for Java 17 if errorprone is enabled for this task + if (task.extensions.findByName("errorprone") != null && project.rootProject.ext.has('errorProneAddModuleOpts')) { + def errorProneAddModuleOpts = project.rootProject.ext.errorProneAddModuleOpts + task.options.forkOptions.jvmArgs += errorProneAddModuleOpts.collect { '-J' + it } + // Potentially: task.options.errorprone.errorproneArgs.add("-XepDisableAllChecks") + logger.lifecycle("[BUILD_GRADLE] Debezium IO: ${task.path} - Added ErrorProne JVM args for forked Java 17 compiler.") + } + } else { + logger.warn("[BUILD_GRADLE] Debezium IO: Environment variable JAVA_HOME_17_X64 not found, empty, or not a valid directory. Value: '${jdk17HomePath ?: 'null'}'. ${task.path} will use default JDK. Java 17 is required for Debezium IO.") + // If JAVA_HOME_17_X64 isn't found, we still try to make the default compiler (if it's 17) behave correctly. + // Or if default is 11, this will likely lead to bad class file version if it finds Debezium JARs. + def newCompilerArgs = task.options.compilerArgs.findAll { arg -> + !arg.startsWith('-source') && !arg.startsWith('-target') && !arg.startsWith('--release') + } + newCompilerArgs.addAll(['--release', '17']) // Attempt to set for default JDK too + task.options.compilerArgs = newCompilerArgs + } + } +} +// This block ensures that if 'java17Home' is passed as a Gradle property, +// the 'compileJava' task for this module will use it. +// project.afterEvaluate { // Use afterEvaluate to ensure 'compileJava' task exists and is configured by BeamModulePlugin +// if (project.hasProperty("java17Home") && project.property("java17Home").toString().trim() != "") { +// tasks.named('compileJava', JavaCompile) { +// logger.lifecycle("Debezium IO: Configuring 'compileJava' to fork and use Java 17 from java17Home: ${project.property("java17Home")}") +// options.fork = true // Essential for using a different javaHome +// options.forkOptions.javaHome = project.file(project.property("java17Home")) + +// // Ensure compiler arguments are set for Java 17, overriding defaults. +// // BeamModulePlugin might set '--release 8' or '--release 11' by default. +// def newCompilerArgs = options.compilerArgs.findAll { arg -> +// !arg.startsWith('-source') && !arg.startsWith('-target') && !arg.startsWith('--release') +// } +// newCompilerArgs.addAll(['--release', '17']) // Use --release for better JDK compatibility. +// options.compilerArgs = newCompilerArgs + +// // Handle ErrorProne arguments if necessary for Java 17 compilation, +// // similar to how setJavaVerOptions does it in BeamModulePlugin.groovy +// if (project.rootProject.ext.has('errorProneAddModuleOpts')) { +// def errorProneAddModuleOpts = project.rootProject.ext.errorProneAddModuleOpts +// // The -J prefix is needed to workaround https://github.com/gradle/gradle/issues/22747 when forking +// options.forkOptions.jvmArgs += errorProneAddModuleOpts.collect { '-J' + it } +// // May also need: options.errorprone.errorproneArgs.add("-XepDisableAllChecks") +// // if error prone is active and causes issues with Java 17. Check BeamModulePlugin for exact args. +// } +// } +// } else { +// logger.lifecycle("Debezium IO: 'java17Home' property not found or empty. Relying on Gradle toolchain detection for Java 17 for 'compileJava'.") +// } + +// // If you also want compileTestJava to use this (good for consistency): +// if (project.hasProperty("testJavaVersion") && project.property("testJavaVersion") == "17" && +// project.hasProperty("java17Home") && project.property("java17Home").toString().trim() != "") { +// tasks.named('compileTestJava', JavaCompile) { +// logger.lifecycle("Debezium IO: Configuring 'compileTestJava' to use Java 17 from java17Home via testJavaVersion=17 property.") +// // The BeamModulePlugin's logic for testJavaVersion should automatically +// // call setJavaVerOptions, which will use java17Home. +// // You typically don't need to re-configure it here if -PtestJavaVersion=17 is passed. +// // Just ensure source/target are aligned if not already. +// sourceCompatibility = JavaVersion.VERSION_17 +// targetCompatibility = JavaVersion.VERSION_17 +// } +// } +// } + dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation library.java.vendored_grpc_1_69_0 @@ -60,9 +154,9 @@ dependencies { permitUnusedDeclared "org.apache.kafka:connect-json:2.5.0" // BEAM-11761 // Debezium dependencies - implementation group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final' - testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.3.1.Final' - testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '1.3.1.Final' + implementation group: 'io.debezium', name: 'debezium-core', version: '3.1.1.Final' + testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '3.1.1.Final' + testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '3.1.1.Final' } test { @@ -89,14 +183,3 @@ task integrationTest(type: Test, dependsOn: processTestResources) { useJUnit { } } - -configurations.all (Configuration it) -> { - resolutionStrategy { - // Force protobuf 3 because debezium is currently incompatible with protobuf 4. - // TODO - remove this and upgrade the version of debezium once a proto-4 compatible version is available - // https://github.com/apache/beam/pull/33526 does some of this, but was abandoned because it still doesn't - // work with protobuf 4. - force "com.google.protobuf:protobuf-java:3.25.5" - force "com.google.protobuf:protobuf-java-util:3.25.5" - } -} diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java index 30ad8a5f9f74..0d5c317e0856 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java @@ -554,10 +554,11 @@ public Map getConfigurationMap() { configuration.computeIfAbsent(entry.getKey(), k -> entry.getValue()); } - // Set default Database History impl. if not provided + // Set default Database History implementation and Kafka topic prefix, if not provided configuration.computeIfAbsent( "database.history", k -> KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName()); + configuration.computeIfAbsent("topic.prefix", k -> "beam-debezium-connector"); String stringProperties = Joiner.on('\n').withKeyValueSeparator(" -> ").join(configuration); LOG.debug("---------------- Connector configuration: {}", stringProperties); diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java index 9f227708e5e6..3d088121cd84 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -110,7 +110,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { connectorConfiguration .withConnectionProperty("table.include.list", configuration.getTable()) .withConnectionProperty("include.schema.changes", "false") - .withConnectionProperty("database.server.name", "beam-pipeline-server"); + // add random unique name/identifier for server to identify connector + .withConnectionProperty("database.server.name", "beam-pipeline-server") + .withConnectionProperty("database.server.id", "579676") + .withConnectionProperty( + "schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory") + .withConnectionProperty( + "schema.history.internal.file.filename", "data/schema_history.dat"); if (configuration.getDatabase().equals("POSTGRES")) { LOG.info( "As Database is POSTGRES, we set the `database.dbname` property to {}.", diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java index 54330d620477..a1edfb210548 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java @@ -22,9 +22,9 @@ import io.debezium.document.Document; import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; -import io.debezium.relational.history.AbstractDatabaseHistory; -import io.debezium.relational.history.DatabaseHistoryException; +import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.SchemaHistoryException; import java.io.IOException; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; @@ -474,7 +474,7 @@ public IsBounded isBounded() { } } - public static class DebeziumSDFDatabaseHistory extends AbstractDatabaseHistory { + public static class DebeziumSDFDatabaseHistory extends AbstractSchemaHistory { private List history; public DebeziumSDFDatabaseHistory() { @@ -497,7 +497,7 @@ public void start() { } @Override - protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException { + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { LOG.debug("------------- Adding history! {}", record); history.add(DocumentWriter.defaultWriter().writeAsBytes(record.document())); diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java index 12ba57bad45d..ab4f71de676d 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java @@ -210,7 +210,7 @@ public void testDebeziumIOMySql() { .withMaxNumberOfRecords(30) .withCoder(StringUtf8Coder.of())); String expected = - "{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\"," + "{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.9.8.Final\",\"name\":\"dbserver1\"," + "\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null," + "\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\"," + "\"street\":\"3183 Moore Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001," diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java index c4b5d2d1f890..de34259bffb2 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertThrows; +import io.debezium.DebeziumException; import java.time.Duration; import java.util.Arrays; import java.util.stream.Collectors; @@ -28,7 +29,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.kafka.connect.errors.ConnectException; import org.hamcrest.Matchers; import org.junit.ClassRule; import org.junit.Test; @@ -124,15 +124,16 @@ public void testNoProblem() { result.getSchema().getFields().stream() .map(field -> field.getName()) .collect(Collectors.toList()), - Matchers.containsInAnyOrder("before", "after", "source", "op", "ts_ms", "transaction")); + Matchers.containsInAnyOrder( + "before", "after", "source", "op", "ts_ms", "ts_us", "ts_ns", "transaction")); } @Test public void testWrongUser() { Pipeline readPipeline = Pipeline.create(); - ConnectException ex = + DebeziumException ex = assertThrows( - ConnectException.class, + DebeziumException.class, () -> { PCollectionRowTuple.empty(readPipeline) .apply( @@ -151,9 +152,9 @@ public void testWrongUser() { @Test public void testWrongPassword() { Pipeline readPipeline = Pipeline.create(); - ConnectException ex = + DebeziumException ex = assertThrows( - ConnectException.class, + DebeziumException.class, () -> { PCollectionRowTuple.empty(readPipeline) .apply( @@ -172,9 +173,9 @@ public void testWrongPassword() { @Test public void testWrongPort() { Pipeline readPipeline = Pipeline.create(); - ConnectException ex = + DebeziumException ex = assertThrows( - ConnectException.class, + DebeziumException.class, () -> { PCollectionRowTuple.empty(readPipeline) .apply(makePtransform(userName, password, database, 12345, "localhost"))