From fdb77f893b29f8daecd86f6a83cf000a7efcf913 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 21 Feb 2025 21:14:51 -0500 Subject: [PATCH] Bump to Mokito 4 * Split tests need mockStatic outside of DataflowTest To be able to run DataflowTest internally which did not support mockStatic * Remove usage of powermock for GcpIO, KafkaIO, DataflowRunner tests * Minimize powermock dependency for Flink runner --- .../beam/gradle/BeamModulePlugin.groovy | 7 +- runners/flink/flink_runner.gradle | 3 +- .../metrics/FlinkMetricContainerTest.java | 6 +- .../dataflow/DataflowRunnerStaticTest.java | 196 ++++++++++++++++++ .../runners/dataflow/DataflowRunnerTest.java | 112 +--------- .../dataflow/util/PackageUtilTest.java | 42 ++-- .../worker/build.gradle | 1 - .../dataflow/worker/HotKeyLoggerTest.java | 7 +- .../state/WindmillStateInternalsTest.java | 26 +-- .../BundleFinalizationHandlersTest.java | 6 +- .../runtime/ClassicBundleManagerTest.java | 16 +- .../SparkExecutableStageFunctionTest.java | 4 +- .../resources/beam/checkstyle/checkstyle.xml | 8 - sdks/java/expansion-service/build.gradle | 4 - .../io/google-cloud-platform/build.gradle | 3 +- .../io/gcp/pubsub/PubsubUnboundedSink.java | 13 +- .../sdk/io/gcp/datastore/DatastoreV1Test.java | 4 +- .../firestore/BaseFirestoreV1WriteFnTest.java | 2 +- .../io/gcp/pubsub/PubsubIOExternalTest.java | 11 +- .../ChildPartitionsRecordActionTest.java | 4 +- .../dao/PartitionMetadataDaoTest.java | 17 +- .../io/gcp/testing/BigqueryClientTest.java | 18 +- .../io/gcp/testing/BigqueryMatcherTest.java | 21 +- .../hadoop/format/HadoopFormatIOReadTest.java | 3 +- sdks/java/io/influxdb/build.gradle | 10 +- sdks/java/io/kafka/build.gradle | 4 +- .../apache/beam/sdk/io/kafka/KafkaWriter.java | 6 + .../sdk/io/kafka/KafkaIOExternalTest.java | 7 +- .../sdk/io/kafka/ProducerRecordCoderTest.java | 22 +- 29 files changed, 343 insertions(+), 240 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerStaticTest.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 1065794da798..b12ddee740be 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -629,7 +629,6 @@ class BeamModulePlugin implements Plugin { // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom def netty_version = "4.1.110.Final" def postgres_version = "42.2.16" - def powermock_version = "2.0.9" // [bomupgrader] determined by: com.google.protobuf:protobuf-java, consistent with: google_cloud_platform_libraries_bom def protobuf_version = "4.29.0" def qpid_jms_client_version = "0.61.0" @@ -837,8 +836,8 @@ class BeamModulePlugin implements Plugin { log4j2_core : "org.apache.logging.log4j:log4j-core:$log4j2_version", log4j2_to_slf4j : "org.apache.logging.log4j:log4j-to-slf4j:$log4j2_version", log4j2_slf4j_impl : "org.apache.logging.log4j:log4j-slf4j-impl:$log4j2_version", - mockito_core : "org.mockito:mockito-core:3.7.7", - mockito_inline : "org.mockito:mockito-inline:4.5.1", + mockito_core : "org.mockito:mockito-core:4.11.0", + mockito_inline : "org.mockito:mockito-inline:4.11.0", mongo_java_driver : "org.mongodb:mongo-java-driver:3.12.11", nemo_compiler_frontend_beam : "org.apache.nemo:nemo-compiler-frontend-beam:$nemo_version", netty_all : "io.netty:netty-all:$netty_version", @@ -847,8 +846,6 @@ class BeamModulePlugin implements Plugin { netty_transport : "io.netty:netty-transport:$netty_version", netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version", postgres : "org.postgresql:postgresql:$postgres_version", - powermock : "org.powermock:powermock-module-junit4:$powermock_version", - powermock_mockito : "org.powermock:powermock-api-mockito2:$powermock_version", protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version", protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version", proto_google_cloud_bigquery_storage_v1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1", // google_cloud_platform_libraries_bom sets version diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index a4b6af7d70b6..f831ec86319c 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -201,7 +201,8 @@ dependencies { testImplementation library.java.hamcrest testImplementation library.java.junit testImplementation library.java.mockito_core - testImplementation library.java.powermock + // TODO(https://github.com/apache/beam/issues/34056) remove powermock once remove Whitebox usages + testImplementation "org.powermock:powermock-reflect:2.0.9" testImplementation library.java.google_api_services_bigquery testImplementation project(":sdks:java:io:google-cloud-platform") testImplementation library.java.jackson_dataformat_yaml diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java index 4b027465618c..663af0540852 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java @@ -21,7 +21,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertNotNull; -import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; @@ -101,7 +101,7 @@ public void testCounter() { public void testGauge() { FlinkMetricContainer.FlinkGauge flinkGauge = new FlinkMetricContainer.FlinkGauge(GaugeResult.empty()); - when(metricGroup.gauge(eq("namespace.name"), anyObject())).thenReturn(flinkGauge); + when(metricGroup.gauge(eq("namespace.name"), any())).thenReturn(flinkGauge); MetricsContainer step = container.getMetricsContainer("step"); MetricName metricName = MetricName.named("namespace", "name"); @@ -251,7 +251,7 @@ public boolean matches(FlinkDistributionGauge argument) { public void testDistribution() { FlinkMetricContainer.FlinkDistributionGauge flinkGauge = new FlinkMetricContainer.FlinkDistributionGauge(DistributionResult.IDENTITY_ELEMENT); - when(metricGroup.gauge(eq("namespace.name"), anyObject())).thenReturn(flinkGauge); + when(metricGroup.gauge(eq("namespace.name"), any())).thenReturn(flinkGauge); MetricsContainer step = container.getMetricsContainer("step"); MetricName metricName = MetricName.named("namespace", "name"); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerStaticTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerStaticTest.java new file mode 100644 index 000000000000..354ecb5e2c2f --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerStaticTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.mock; + +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.Job; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.options.DefaultGcpRegionFactory; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtil; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.MimeTypes; +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +/** + * Tests for the {@link DataflowRunner} that involves mock static methods. + * + *

Separated from {@link DataflowRunnerTest}. + */ +@RunWith(JUnit4.class) +public class DataflowRunnerStaticTest { + private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp"; + private static final String PROJECT_ID = "some-project"; + private static final String REGION_ID = "some-region-1"; + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); + private transient Dataflow.Projects.Locations.Jobs mockJobs; + private transient GcsUtil mockGcsUtil; + + private DataflowPipelineOptions buildPipelineOptions() throws IOException { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowRunner.class); + options.setProject(PROJECT_ID); + options.setTempLocation(VALID_TEMP_BUCKET); + options.setRegion(REGION_ID); + // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath. + options.setFilesToStage(new ArrayList<>()); + options.setDataflowClient(DataflowRunnerTest.buildMockDataflow(mockJobs)); + options.setGcsUtil(mockGcsUtil); + options.setGcpCredential(new TestCredential()); + + // Configure the FileSystem registrar to use these options. + FileSystems.setDefaultPipelineOptions(options); + + return options; + } + + @Before + public void setUp() throws IOException { + mockGcsUtil = DataflowRunnerTest.buildMockGcsUtil(); + mockJobs = mock(Dataflow.Projects.Locations.Jobs.class); + } + + /** + * Test that the region is set in the generated JSON pipeline options even when a default value is + * grabbed from the environment. + */ + @Test + public void testDefaultRegionSet() throws Exception { + try (MockedStatic mocked = + Mockito.mockStatic(DefaultGcpRegionFactory.class)) { + mocked.when(DefaultGcpRegionFactory::getRegionFromEnvironment).thenReturn(REGION_ID); + Dataflow.Projects.Locations.Jobs mockJobs = mock(Dataflow.Projects.Locations.Jobs.class); + + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowRunner.class); + options.setProject(PROJECT_ID); + options.setTempLocation(VALID_TEMP_BUCKET); + // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath. + options.setFilesToStage(new ArrayList<>()); + options.setDataflowClient(DataflowRunnerTest.buildMockDataflow(mockJobs)); + options.setGcsUtil(DataflowRunnerTest.buildMockGcsUtil()); + options.setGcpCredential(new TestCredential()); + + Pipeline p = Pipeline.create(options); + p.run(); + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture()); + Map sdkPipelineOptions = + jobCaptor.getValue().getEnvironment().getSdkPipelineOptions(); + + assertThat(sdkPipelineOptions, hasKey("options")); + Map optionsMap = (Map) sdkPipelineOptions.get("options"); + assertThat(optionsMap, hasEntry("region", options.getRegion())); + } + } + + /** + * Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate + * exception when an output file throws IOException at close. + */ + @Test + public void testTemplateRunnerLoggedErrorForFileCloseError() throws Exception { + File templateLocation = tmpFolder.newFile(); + String closeErrorMessage = "Unable to close"; + + try (MockedStatic mocked = + Mockito.mockStatic(FileSystems.class, CALLS_REAL_METHODS)) { + mocked + .when( + () -> + FileSystems.create( + FileSystems.matchNewResource(templateLocation.getPath(), false), + MimeTypes.TEXT)) + .thenReturn( + DataflowRunnerTest.createWritableByteChannelThrowsIOExceptionAtClose( + closeErrorMessage)); + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setTemplateLocation(templateLocation.getPath()); + Pipeline p = Pipeline.create(options); + + thrown.expectMessage("Cannot create output file at"); + thrown.expect(RuntimeException.class); + thrown.expectCause(Matchers.isA(IOException.class)); + thrown.expectCause(hasProperty("message", is(closeErrorMessage))); + + p.run(); + } + } + + /** + * Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate + * exception when an output file throws IOException at close. + */ + @Test + public void testTemplateRunnerLoggedErrorForFileWriteError() throws Exception { + File templateLocation = tmpFolder.newFile(); + String closeErrorMessage = "Unable to write"; + + try (MockedStatic mocked = + Mockito.mockStatic(FileSystems.class, CALLS_REAL_METHODS)) { + mocked + .when( + () -> + FileSystems.create( + FileSystems.matchNewResource(templateLocation.getPath(), false), + MimeTypes.TEXT)) + .thenReturn( + DataflowRunnerTest.createWritableByteChannelThrowsIOExceptionAtWrite( + closeErrorMessage)); + + thrown.expectMessage("Cannot create output file at"); + thrown.expect(RuntimeException.class); + thrown.expectCause(Matchers.isA(IOException.class)); + thrown.expectCause(hasProperty("message", is(closeErrorMessage))); + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setTemplateLocation(templateLocation.getPath()); + Pipeline p = Pipeline.create(options); + + p.run(); + } + } +} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 73624ee91e6d..83cbfbafa79a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -45,7 +45,6 @@ import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; -import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -98,7 +97,6 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; -import org.apache.beam.runners.dataflow.options.DefaultGcpRegionFactory; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; @@ -153,7 +151,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.util.construction.BeamUrns; import org.apache.beam.sdk.util.construction.Environments; @@ -192,7 +189,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; -import org.mockito.MockedStatic; import org.mockito.Mockito; /** @@ -270,8 +266,7 @@ private static Pipeline buildDataflowPipelineWithLargeGraph(DataflowPipelineOpti return p; } - private static Dataflow buildMockDataflow(Dataflow.Projects.Locations.Jobs mockJobs) - throws IOException { + static Dataflow buildMockDataflow(Dataflow.Projects.Locations.Jobs mockJobs) throws IOException { Dataflow mockDataflowClient = mock(Dataflow.class); Dataflow.Projects mockProjects = mock(Dataflow.Projects.class); Dataflow.Projects.Locations mockLocations = mock(Dataflow.Projects.Locations.class); @@ -303,7 +298,7 @@ private static Dataflow buildMockDataflow(Dataflow.Projects.Locations.Jobs mockJ return mockDataflowClient; } - private static GcsUtil buildMockGcsUtil() throws IOException { + static GcsUtil buildMockGcsUtil() throws IOException { GcsUtil mockGcsUtil = mock(GcsUtil.class); when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class))) @@ -490,41 +485,6 @@ public void testSettingOfSdkPipelineOptions() throws IOException { assertThat(optionsMap, hasEntry("region", options.getRegion())); } - /** - * Test that the region is set in the generated JSON pipeline options even when a default value is - * grabbed from the environment. - */ - @Test - public void testDefaultRegionSet() throws Exception { - try (MockedStatic mocked = - Mockito.mockStatic(DefaultGcpRegionFactory.class)) { - mocked.when(DefaultGcpRegionFactory::getRegionFromEnvironment).thenReturn(REGION_ID); - Dataflow.Projects.Locations.Jobs mockJobs = mock(Dataflow.Projects.Locations.Jobs.class); - - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowRunner.class); - options.setProject(PROJECT_ID); - options.setTempLocation(VALID_TEMP_BUCKET); - // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath. - options.setFilesToStage(new ArrayList<>()); - options.setDataflowClient(buildMockDataflow(mockJobs)); - options.setGcsUtil(buildMockGcsUtil()); - options.setGcpCredential(new TestCredential()); - - Pipeline p = Pipeline.create(options); - p.run(); - - ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); - Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture()); - Map sdkPipelineOptions = - jobCaptor.getValue().getEnvironment().getSdkPipelineOptions(); - - assertThat(sdkPipelineOptions, hasKey("options")); - Map optionsMap = (Map) sdkPipelineOptions.get("options"); - assertThat(optionsMap, hasEntry("region", options.getRegion())); - } - } - @Test public void testSettingFlexRS() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); @@ -2022,7 +1982,7 @@ public void testTemplateRunnerLoggedErrorForFileNotCreatable() throws Exception p.run(); } - private static WritableByteChannel createWritableByteChannelThrowsIOExceptionAtClose( + static WritableByteChannel createWritableByteChannelThrowsIOExceptionAtClose( String errorMessage) { return new WritableByteChannel() { @Override @@ -2044,39 +2004,7 @@ public void close() throws IOException { }; } - /** - * Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate - * exception when an output file throws IOException at close. - */ - @Test - public void testTemplateRunnerLoggedErrorForFileCloseError() throws Exception { - File templateLocation = tmpFolder.newFile(); - String closeErrorMessage = "Unable to close"; - - try (MockedStatic mocked = - Mockito.mockStatic(FileSystems.class, CALLS_REAL_METHODS)) { - mocked - .when( - () -> - FileSystems.create( - FileSystems.matchNewResource(templateLocation.getPath(), false), - MimeTypes.TEXT)) - .thenReturn(createWritableByteChannelThrowsIOExceptionAtClose(closeErrorMessage)); - - DataflowPipelineOptions options = buildPipelineOptions(); - options.setTemplateLocation(templateLocation.getPath()); - Pipeline p = Pipeline.create(options); - - thrown.expectMessage("Cannot create output file at"); - thrown.expect(RuntimeException.class); - thrown.expectCause(Matchers.isA(IOException.class)); - thrown.expectCause(hasProperty("message", is(closeErrorMessage))); - - p.run(); - } - } - - private static WritableByteChannel createWritableByteChannelThrowsIOExceptionAtWrite( + static WritableByteChannel createWritableByteChannelThrowsIOExceptionAtWrite( String errorMessage) { return new WritableByteChannel() { @Override @@ -2094,38 +2022,6 @@ public void close() {} }; } - /** - * Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate - * exception when an output file throws IOException at close. - */ - @Test - public void testTemplateRunnerLoggedErrorForFileWriteError() throws Exception { - File templateLocation = tmpFolder.newFile(); - String closeErrorMessage = "Unable to write"; - - try (MockedStatic mocked = - Mockito.mockStatic(FileSystems.class, CALLS_REAL_METHODS)) { - mocked - .when( - () -> - FileSystems.create( - FileSystems.matchNewResource(templateLocation.getPath(), false), - MimeTypes.TEXT)) - .thenReturn(createWritableByteChannelThrowsIOExceptionAtWrite(closeErrorMessage)); - - thrown.expectMessage("Cannot create output file at"); - thrown.expect(RuntimeException.class); - thrown.expectCause(Matchers.isA(IOException.class)); - thrown.expectCause(hasProperty("message", is(closeErrorMessage))); - - DataflowPipelineOptions options = buildPipelineOptions(); - options.setTemplateLocation(templateLocation.getPath()); - Pipeline p = Pipeline.create(options); - - p.run(); - } - } - @Test public void testGetContainerImageForJobFromOption() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index c892cab57d5d..670032425eb2 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -28,7 +28,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyListOf; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -257,7 +257,7 @@ public void testPackageNamingWithDirectoriesHavingSameContentsButDifferentNames( @Test public void testPackageUploadWithLargeClasspathLogsWarning() throws Exception { File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( StorageObjectOrIOException.create( @@ -278,7 +278,7 @@ public void testPackageUploadWithFileSucceeds() throws Exception { Pipe pipe = Pipe.open(); String contents = "This is a test!"; File tmpFile = makeFileWithContents("file.txt", contents); - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( StorageObjectOrIOException.create(new FileNotFoundException("some/path")))); @@ -293,7 +293,7 @@ public void testPackageUploadWithFileSucceeds() throws Exception { createOptions); DataflowPackage target = Iterables.getOnlyElement(targets); - verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyList()); verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)); verifyNoMoreInteractions(mockGcsUtil); @@ -308,7 +308,7 @@ public void testPackageUploadWithFileSucceeds() throws Exception { public void testStagingPreservesClasspath() throws Exception { File smallFile = makeFileWithContents("small.txt", "small"); File largeFile = makeFileWithContents("large.log", "large contents"); - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( StorageObjectOrIOException.create(new FileNotFoundException("some/path")))); @@ -338,7 +338,7 @@ public void testPackageUploadWithDirectorySucceeds() throws Exception { makeFileWithContents("folder/file.txt", "This is a test!"); makeFileWithContents("folder/directory/file.txt", "This is also a test!"); - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( StorageObjectOrIOException.create(new FileNotFoundException("some/path")))); @@ -350,7 +350,7 @@ public void testPackageUploadWithDirectorySucceeds() throws Exception { STAGING_PATH, createOptions); - verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyList()); verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)); verifyNoMoreInteractions(mockGcsUtil); @@ -372,7 +372,7 @@ public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception { Pipe pipe = Pipe.open(); File tmpDirectory = tmpFolder.newFolder("folder"); - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( StorageObjectOrIOException.create(new FileNotFoundException("some/path")))); @@ -386,7 +386,7 @@ public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception { createOptions); DataflowPackage target = Iterables.getOnlyElement(targets); - verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyList()); verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)); verifyNoMoreInteractions(mockGcsUtil); @@ -401,7 +401,7 @@ public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception { @Test(expected = RuntimeException.class) public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception { File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( StorageObjectOrIOException.create(new FileNotFoundException("some/path")))); @@ -416,7 +416,7 @@ public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception { fastNanoClockAndSleeper::sleep, createOptions); } finally { - verify(mockGcsUtil, times(5)).getObjects(anyListOf(GcsPath.class)); + verify(mockGcsUtil, times(5)).getObjects(anyList()); verify(mockGcsUtil, times(5)).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)); verifyNoMoreInteractions(mockGcsUtil); } @@ -425,7 +425,7 @@ public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception { @Test public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() throws Exception { File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( StorageObjectOrIOException.create(new FileNotFoundException("some/path")))); @@ -461,7 +461,7 @@ public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() thr "Stale credentials can be resolved by executing 'gcloud auth application-default " + "login'"))); } finally { - verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyList()); verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)); verifyNoMoreInteractions(mockGcsUtil); } @@ -471,7 +471,7 @@ public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() thr public void testPackageUploadEventuallySucceeds() throws Exception { Pipe pipe = Pipe.open(); File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( StorageObjectOrIOException.create(new FileNotFoundException("some/path")))); @@ -489,7 +489,7 @@ public void testPackageUploadEventuallySucceeds() throws Exception { fastNanoClockAndSleeper::sleep, createOptions); } finally { - verify(mockGcsUtil, times(3)).getObjects(anyListOf(GcsPath.class)); + verify(mockGcsUtil, times(3)).getObjects(anyList()); verify(mockGcsUtil, times(3)).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)); verifyNoMoreInteractions(mockGcsUtil); } @@ -498,7 +498,7 @@ public void testPackageUploadEventuallySucceeds() throws Exception { @Test public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws Exception { File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( StorageObjectOrIOException.create( @@ -507,7 +507,7 @@ public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws Exception { defaultPackageUtil.stageClasspathElements( ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())), STAGING_PATH, createOptions); - verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyList()); verifyNoMoreInteractions(mockGcsUtil); } @@ -519,7 +519,7 @@ public void testPackageUploadIsNotSkippedWhenSizesAreDifferent() throws Exceptio tmpFolder.newFolder("folder", "directory"); makeFileWithContents("folder/file.txt", "This is a test!"); makeFileWithContents("folder/directory/file.txt", "This is also a test!"); - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( StorageObjectOrIOException.create( @@ -532,7 +532,7 @@ public void testPackageUploadIsNotSkippedWhenSizesAreDifferent() throws Exceptio STAGING_PATH, createOptions); - verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyList()); verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)); verifyNoMoreInteractions(mockGcsUtil); } @@ -543,7 +543,7 @@ public void testPackageUploadWithExplicitPackageName() throws Exception { File tmpFile = makeFileWithContents("file.txt", "This is a test!"); final String overriddenName = "alias.txt"; - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( StorageObjectOrIOException.create(new FileNotFoundException("some/path")))); @@ -557,7 +557,7 @@ public void testPackageUploadWithExplicitPackageName() throws Exception { createOptions); DataflowPackage target = Iterables.getOnlyElement(targets); - verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyList()); verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)); verifyNoMoreInteractions(mockGcsUtil); diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle b/runners/google-cloud-dataflow-java/worker/build.gradle index ab7411ce2eb7..8fd2e241734d 100644 --- a/runners/google-cloud-dataflow-java/worker/build.gradle +++ b/runners/google-cloud-dataflow-java/worker/build.gradle @@ -228,7 +228,6 @@ dependencies { shadowTest library.java.jsonassert shadowTest library.java.junit shadowTest library.java.mockito_core - shadowTest library.java.powermock } project.task('validateShadedJarContainsSlf4jJdk14', dependsOn: 'shadowJar') { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java index 6dee1257b3eb..2e1872966ca1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java @@ -28,12 +28,9 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.slf4j.LoggerFactory; +import org.junit.runners.JUnit4; -@RunWith(PowerMockRunner.class) -@PrepareForTest({HotKeyLoggerTest.class, LoggerFactory.class}) +@RunWith(JUnit4.class) public class HotKeyLoggerTest { @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(HotKeyLogger.class); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index 9e33188c3299..9fe424fe9894 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -2174,13 +2174,13 @@ public void testOrderedListClearBeforeRead() throws Exception { orderedListState.add(helloElement); assertThat(orderedListState.read(), Matchers.containsInAnyOrder(helloElement)); // Shouldn't need to read from windmill for this. - Mockito.verifyZeroInteractions(mockReader); + Mockito.verifyNoInteractions(mockReader); assertThat( orderedListState.readRange(Instant.ofEpochSecond(1), Instant.ofEpochSecond(2)), Matchers.containsInAnyOrder(helloElement)); // Shouldn't need to read from windmill for this. - Mockito.verifyZeroInteractions(mockReader); + Mockito.verifyNoInteractions(mockReader); // Shouldn't need to read from windmill for this. assertThat( @@ -2189,7 +2189,7 @@ public void testOrderedListClearBeforeRead() throws Exception { assertThat( orderedListState.readRange(Instant.EPOCH, Instant.ofEpochSecond(1)), Matchers.emptyIterable()); - Mockito.verifyZeroInteractions(mockReader); + Mockito.verifyNoInteractions(mockReader); } @Test @@ -2604,7 +2604,7 @@ public void testNewOrderedListNoFetch() throws Exception { assertThat(orderedList.read(), Matchers.emptyIterable()); // Shouldn't need to read from windmill for this. - Mockito.verifyZeroInteractions(mockReader); + Mockito.verifyNoInteractions(mockReader); } @Test @@ -2640,7 +2640,7 @@ public void testBagClearBeforeRead() throws Exception { assertThat(bag.read(), Matchers.containsInAnyOrder("hello")); // Shouldn't need to read from windmill for this. - Mockito.verifyZeroInteractions(mockReader); + Mockito.verifyNoInteractions(mockReader); } @Test @@ -2763,7 +2763,7 @@ public void testNewBagNoFetch() throws Exception { assertThat(bag.read(), Matchers.emptyIterable()); // Shouldn't need to read from windmill for this. - Mockito.verifyZeroInteractions(mockReader); + Mockito.verifyNoInteractions(mockReader); } @Test @@ -2804,7 +2804,7 @@ public void testCombiningClearBeforeRead() throws Exception { assertThat(value.read(), Matchers.equalTo(13)); // Shouldn't need to read from windmill for this because we immediately cleared.. - Mockito.verifyZeroInteractions(mockReader); + Mockito.verifyNoInteractions(mockReader); } @Test @@ -2876,9 +2876,9 @@ public void testCombiningAddPersistWithCompact() throws Exception { Mockito.when( mockReader.bagFuture( - org.mockito.Matchers.any(), - org.mockito.Matchers.any(), - org.mockito.Matchers.>any())) + org.mockito.ArgumentMatchers.any(), + org.mockito.ArgumentMatchers.any(), + org.mockito.ArgumentMatchers.>any())) .thenReturn(Futures.immediateFuture(ImmutableList.of(new int[] {40}, new int[] {60}))); GroupingState value = underTest.state(NAMESPACE, COMBINING_ADDR); @@ -2939,7 +2939,7 @@ public void testNewCombiningNoFetch() throws Exception { assertThat(value.isEmpty().read(), Matchers.is(false)); // Shouldn't need to read from windmill for this. - Mockito.verifyZeroInteractions(mockReader); + Mockito.verifyNoInteractions(mockReader); } @Test @@ -3224,7 +3224,7 @@ public void testNewWatermarkNoFetch() throws Exception { assertThat(bag.read(), Matchers.nullValue()); // Shouldn't need to read from windmill for this. - Mockito.verifyZeroInteractions(mockReader); + Mockito.verifyNoInteractions(mockReader); } @Test @@ -3330,7 +3330,7 @@ public void testNewValueNoFetch() throws Exception { assertNull(value.read()); // Shouldn't need to read from windmill for this. - Mockito.verifyZeroInteractions(mockReader); + Mockito.verifyNoInteractions(mockReader); } @Test diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/BundleFinalizationHandlersTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/BundleFinalizationHandlersTest.java index ea2f00e7e23b..be8e2ebddeda 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/BundleFinalizationHandlersTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/BundleFinalizationHandlersTest.java @@ -19,7 +19,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.verifyNoInteractions; import org.apache.beam.model.fnexecution.v1.BeamFnApi.FinalizeBundleRequest; import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest; @@ -41,11 +41,11 @@ public void testInMemoryFinalizer() { InMemoryFinalizer finalizer = BundleFinalizationHandlers.inMemoryFinalizer(mockHandler); finalizer.finalizeAllOutstandingBundles(); - verifyZeroInteractions(mockHandler); + verifyNoInteractions(mockHandler); finalizer.requestsFinalization("A"); finalizer.requestsFinalization("B"); - verifyZeroInteractions(mockHandler); + verifyNoInteractions(mockHandler); finalizer.finalizeAllOutstandingBundles(); verify(mockHandler).handle(requestFor("A")); diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java index b72b3e8c4b33..1072e628a70c 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java @@ -21,7 +21,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -149,7 +149,7 @@ public void testWhenTryFinishBundleThenBundleIsReset() { bundleManager.tryStartBundle(); bundleManager.tryFinishBundle(mockEmitter); - verify(mockEmitter, times(1)).emitFuture(anyObject()); + verify(mockEmitter, times(1)).emitFuture(any()); verify(bundleProgressListener, times(1)).onBundleFinished(mockEmitter); assertEquals( "Expected the number of element in the current bundle to be 0", @@ -172,7 +172,7 @@ public void testTryFinishBundleClosesBundleOnMaxWatermark() { bundleManager.tryStartBundle(); bundleManager.tryFinishBundle(mockEmitter); - verify(mockEmitter, times(1)).emitFuture(anyObject()); + verify(mockEmitter, times(1)).emitFuture(any()); verify(bundleProgressListener, times(1)).onBundleFinished(mockEmitter); assertEquals( "Expected the number of element in the current bundle to be 0", @@ -197,7 +197,7 @@ public void testTryFinishBundleShouldNotCloseBundle() { bundleManager.tryFinishBundle(mockEmitter); verify(mockFutureCollector, times(1)).finish(); - verify(mockEmitter, times(1)).emitFuture(anyObject()); + verify(mockEmitter, times(1)).emitFuture(any()); verify(bundleProgressListener, times(0)).onBundleFinished(mockEmitter); assertEquals( "Expected the number of element in the current bundle to be 1", @@ -216,7 +216,7 @@ public void testTryFinishBundleWhenNoBundleInProgress() { bundleManager.tryFinishBundle(mockEmitter); - verify(mockEmitter, times(1)).emitFuture(anyObject()); + verify(mockEmitter, times(1)).emitFuture(any()); assertNull( "tryFinishBundle() should not set the future when no bundle in progress", bundleManager.getCurrentBundleDoneFuture()); @@ -313,7 +313,7 @@ public void testProcessTimerWithBundleTimeElapsed() { bundleManager.tryStartBundle(); bundleManager.processTimer(mockTimer, mockEmitter); - verify(mockEmitter, times(1)).emitFuture(anyObject()); + verify(mockEmitter, times(1)).emitFuture(any()); verify(bundleProgressListener, times(1)).onBundleFinished(mockEmitter); assertEquals( "Expected the number of element in the current bundle to be 0", @@ -340,7 +340,7 @@ public void testProcessTimerWithTimeLessThanMaxBundleTime() { bundleManager.processTimer(mockTimer, mockEmitter); verify(mockFutureCollector, times(1)).finish(); - verify(mockEmitter, times(1)).emitFuture(anyObject()); + verify(mockEmitter, times(1)).emitFuture(any()); verify(bundleProgressListener, times(0)).onBundleFinished(mockEmitter); assertEquals( "Expected the number of element in the current bundle to be 1", @@ -364,7 +364,7 @@ public void testProcessTimerIgnoresNonBundleTimers() { bundleManager.processTimer(mockTimer, mockEmitter); verify(mockFutureCollector, times(0)).finish(); - verify(mockEmitter, times(0)).emitFuture(anyObject()); + verify(mockEmitter, times(0)).emitFuture(any()); verify(bundleProgressListener, times(0)).onBundleFinished(mockEmitter); assertEquals( "Expected the number of element in the current bundle to be 1", diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java index faa2e91d15bb..7276820e99e8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java @@ -23,8 +23,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import java.util.ArrayList; @@ -256,7 +256,7 @@ public void testStageBundleClosed() throws Exception { public void testNoCallOnEmptyInputIterator() throws Exception { SparkExecutableStageFunction function = getFunction(Collections.emptyMap()); function.call(Collections.emptyIterator()); - verifyZeroInteractions(stageBundleFactory); + verifyNoInteractions(stageBundleFactory); } private SparkExecutableStageFunction getFunction( diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/checkstyle.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/checkstyle.xml index 30ea7739f887..5cee5d2f33e2 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/checkstyle.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/checkstyle.xml @@ -127,14 +127,6 @@ page at http://checkstyle.sourceforge.net/config.html --> - - - - - - - - diff --git a/sdks/java/expansion-service/build.gradle b/sdks/java/expansion-service/build.gradle index 7a24754fa722..04e12893bf7c 100644 --- a/sdks/java/expansion-service/build.gradle +++ b/sdks/java/expansion-service/build.gradle @@ -57,7 +57,3 @@ task runExpansionService (type: JavaExec) { classpath = sourceSets.main.runtimeClasspath args = [project.findProperty("constructionService.port") ?: "8097"] } - -compileJava { - outputs.upToDateWhen { false } -} \ No newline at end of file diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 2719c5cce244..2ad0a49b7030 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -165,8 +165,7 @@ dependencies { testImplementation library.java.commons_math3 testImplementation library.java.google_cloud_bigquery testImplementation library.java.mockito_core - testImplementation library.java.powermock - testImplementation library.java.powermock_mockito + testRuntimeOnly library.java.mockito_inline testImplementation library.java.joda_time testImplementation library.java.google_cloud_spanner_test testImplementation library.java.google_cloud_bigtable_emulator diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index aa8e3a411486..00d563772bfd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -218,7 +218,8 @@ public void populateDisplayData(DisplayData.Builder builder) { // ================================================================================ /** Publish messages to Pubsub in batches. */ - private static class WriterFn extends DoFn, Void> { + @VisibleForTesting + static class WriterFn extends DoFn, Void> { private final PubsubClientFactory pubsubFactory; private final @Nullable ValueProvider topic; private final String timestampAttribute; @@ -268,6 +269,16 @@ private static class WriterFn extends DoFn, Void> { this.pubsubRootUrl = pubsubRootUrl; } + @VisibleForTesting + String getIdAttribute() { + return idAttribute; + } + + @VisibleForTesting + ValueProvider getTopic() { + return topic; + } + /** BLOCKING Send {@code messages} as a batch to Pubsub. */ private void publishBatch(List messages, int bytes) throws IOException { Preconditions.checkState(!messages.isEmpty()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index 01c2db8d5e2e..cf3d11e78489 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -47,8 +47,8 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import com.google.datastore.v1.CommitRequest; @@ -983,7 +983,7 @@ public void testSplitQueryFnWithNumSplits() throws Exception { any(Datastore.class), eq(readTimeProto)); } - verifyZeroInteractions(mockDatastore); + verifyNoInteractions(mockDatastore); } /** Tests {@link SplitQueryFn} when no query splits is specified. */ diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java index 73328afb397b..623f947c45a7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java @@ -34,11 +34,11 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.spy; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.ApiException; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java index a221ee1d37d0..c2d137b800de 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java @@ -25,13 +25,13 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.expansion.service.ExpansionService; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaTranslation; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.util.construction.ParDoTranslation; @@ -45,7 +45,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.powermock.reflect.Whitebox; /** Tests for building {@link PubsubIO} externally via the ExpansionService. */ @RunWith(JUnit4.class) @@ -175,12 +174,12 @@ public void testConstructPubsubWrite() throws Exception { RunnerApi.ParDoPayload parDoPayload = RunnerApi.ParDoPayload.parseFrom(writeParDo.getSpec().getPayload()); - DoFn pubsubWriter = ParDoTranslation.getDoFn(parDoPayload); + PubsubUnboundedSink.WriterFn pubsubWriter = + (PubsubUnboundedSink.WriterFn) ParDoTranslation.getDoFn(parDoPayload); - String idAttributeActual = (String) Whitebox.getInternalState(pubsubWriter, "idAttribute"); + String idAttributeActual = pubsubWriter.getIdAttribute(); - ValueProvider topicActual = - (ValueProvider) Whitebox.getInternalState(pubsubWriter, "topic"); + ValueProvider topicActual = pubsubWriter.getTopic(); assertThat(topicActual == null ? null : String.valueOf(topicActual), Matchers.is(topic)); assertThat(idAttributeActual, Matchers.is(idAttribute)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java index 5815bf0c6fdd..38e3b7fdfa0a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java @@ -20,7 +20,6 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.CREATED; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -69,8 +68,7 @@ public void setUp() { interrupter = mock(RestrictionInterrupter.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); - when(dao.runInTransaction(any(), anyObject())) - .thenAnswer(new TestTransactionAnswer(transaction)); + when(dao.runInTransaction(any(), any())).thenAnswer(new TestTransactionAnswer(transaction)); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java index 83d1d8f2aa5e..dc35c2ea4934 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -97,13 +96,13 @@ public void setUp() { @Test public void testInsert() { - when(databaseClient.readWriteTransaction(anyObject())).thenReturn(readWriteTransactionRunner); + when(databaseClient.readWriteTransaction(any())).thenReturn(readWriteTransactionRunner); when(databaseClient.readWriteTransaction()).thenReturn(readWriteTransactionRunner); when(readWriteTransactionRunner.run(any())).thenReturn(null); when(readWriteTransactionRunner.getCommitTimestamp()) .thenReturn(Timestamp.ofTimeMicroseconds(1L)); Timestamp commitTimestamp = partitionMetadataDao.insert(ROW); - verify(databaseClient, times(1)).readWriteTransaction(anyObject()); + verify(databaseClient, times(1)).readWriteTransaction(any()); verify(readWriteTransactionRunner, times(1)).run(any()); verify(readWriteTransactionRunner, times(1)).getCommitTimestamp(); assertEquals(Timestamp.ofTimeMicroseconds(1L), commitTimestamp); @@ -145,7 +144,7 @@ public void testInTransactionContextCannotUpdateToRunning() { ArgumentCaptor> mutations = ArgumentCaptor.forClass(ImmutableList.class); ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); + when(transaction.executeQuery(any(), any())).thenReturn(resultSet); when(resultSet.next()).thenReturn(false); doNothing().when(transaction).buffer(mutations.capture()); @@ -157,7 +156,7 @@ public void testInTransactionContextCannotUpdateToRunning() { @Test public void testInTransactionContextUpdateToRunning() { ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); + when(transaction.executeQuery(any(), any())).thenReturn(resultSet); when(resultSet.next()).thenReturn(true); when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString()); when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build()); @@ -180,7 +179,7 @@ public void testInTransactionContextUpdateToRunning() { public void testInTransactionContextCannotUpdateToScheduled() { System.out.println("Cannot update to scheduled"); ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); + when(transaction.executeQuery(any(), any())).thenReturn(resultSet); when(resultSet.next()).thenReturn(false); ArgumentCaptor> mutations = @@ -194,7 +193,7 @@ public void testInTransactionContextCannotUpdateToScheduled() { public void testInTransactionContextUpdateToScheduled() { System.out.println(" update to scheduled"); ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); + when(transaction.executeQuery(any(), any())).thenReturn(resultSet); when(resultSet.next()).thenReturn(true).thenReturn(false); when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN); when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build()); @@ -252,7 +251,7 @@ public void testInTransactionContextUpdateWatermark() { @Test public void testInTransactionContextGetPartitionWithNoPartitions() { ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); + when(transaction.executeQuery(any(), any())).thenReturn(resultSet); when(resultSet.next()).thenReturn(false); assertNull(inTransactionContext.getPartition(PARTITION_TOKEN)); } @@ -260,7 +259,7 @@ public void testInTransactionContextGetPartitionWithNoPartitions() { @Test public void testInTransactionContextGetPartitionWithPartitions() { ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); + when(transaction.executeQuery(any(), any())).thenReturn(resultSet); when(resultSet.next()).thenReturn(true); when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build()); assertNotNull(inTransactionContext.getPartition(PARTITION_TOKEN)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClientTest.java index 1515bb8e879c..b1260e6fac7e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClientTest.java @@ -28,20 +28,20 @@ import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.QueryRequest; import java.io.IOException; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; /** Tests for {@link BigqueryClient}. */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(BigqueryClient.class) +@RunWith(JUnit4.class) public class BigqueryClientTest { private final String projectId = "test-project"; private final String query = "test-query"; @@ -51,17 +51,23 @@ public class BigqueryClientTest { @Mock private Bigquery mockBigqueryClient; @Mock private Bigquery.Jobs mockJobs; @Mock private Bigquery.Jobs.Query mockQuery; + private MockedStatic mockStatic; @Before public void setUp() throws IOException { MockitoAnnotations.initMocks(this); when(mockBigqueryClient.jobs()).thenReturn(mockJobs); when(mockJobs.query(anyString(), any(QueryRequest.class))).thenReturn(mockQuery); - PowerMockito.mockStatic(BigqueryClient.class); + mockStatic = Mockito.mockStatic(BigqueryClient.class); when(BigqueryClient.getNewBigqueryClient(anyString())).thenReturn(mockBigqueryClient); bqClient = spy(new BigqueryClient("test-app")); } + @After + public void tearDown() { + mockStatic.close(); + } + @Test public void testQueryWithRetriesWhenServiceFails() throws Exception { when(mockQuery.execute()).thenThrow(new IOException()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java index c6f863a536cd..8492e4608c68 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java @@ -27,35 +27,42 @@ import com.google.api.services.bigquery.model.TableRow; import java.math.BigInteger; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; /** Tests for {@link BigqueryMatcher}. */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(BigqueryClient.class) +@RunWith(JUnit4.class) public class BigqueryMatcherTest { private final String appName = "test-app"; private final String projectId = "test-project"; private final String query = "test-query"; @Rule public ExpectedException thrown = ExpectedException.none(); - @Mock private BigqueryClient mockBigqueryClient; + @Mock public BigqueryClient mockBigqueryClient; + private MockedStatic mockStatic; @Before + @SuppressWarnings("CheckReturnValue") // mockStatic public void setUp() { MockitoAnnotations.initMocks(this); - PowerMockito.mockStatic(BigqueryClient.class); + mockStatic = Mockito.mockStatic(BigqueryClient.class); when(BigqueryClient.getClient(anyString())).thenReturn(mockBigqueryClient); } + @After + public void tearDown() { + mockStatic.close(); + } + @Test public void testBigqueryMatcherThatSucceeds() throws Exception { BigqueryMatcher matcher = diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java index 8e68d44f4aa9..ad33c57bedb7 100644 --- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java +++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.nullable; import java.io.IOException; import java.util.ArrayList; @@ -617,7 +618,7 @@ public void testReadIfCreateRecordReaderFails() throws Exception { InputFormat mockInputFormat = Mockito.mock(EmployeeInputFormat.class); Mockito.when( mockInputFormat.createRecordReader( - Mockito.any(InputSplit.class), Mockito.any(TaskAttemptContext.class))) + nullable(InputSplit.class), nullable(TaskAttemptContext.class))) .thenThrow(new IOException("Exception in creating RecordReader")); HadoopInputFormatBoundedSource boundedSource = new HadoopInputFormatBoundedSource<>( diff --git a/sdks/java/io/influxdb/build.gradle b/sdks/java/io/influxdb/build.gradle index 56833d00f453..dc6c85059cdb 100644 --- a/sdks/java/io/influxdb/build.gradle +++ b/sdks/java/io/influxdb/build.gradle @@ -30,10 +30,16 @@ dependencies { implementation "com.squareup.okhttp3:okhttp:4.6.0" implementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit - testImplementation library.java.powermock - testImplementation library.java.powermock_mockito + // TODO(https://github.com/apache/beam/issues/34056) remove powermock once removed static mocks. + testImplementation "org.powermock:powermock-module-junit4:2.0.9" + testImplementation "org.powermock:powermock-api-mockito2:2.0.9" testImplementation library.java.mockito_core testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:io:common") } + +configurations.testRuntimeClasspath { + // Pin mockito-core 3 for now due to powermock not support mockito 4 + resolutionStrategy.force "org.mockito:mockito-core:3.7.7" +} diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 04563c478d6d..308023399e98 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -99,8 +99,8 @@ dependencies { // For testing Cross-language transforms testImplementation library.java.avro testImplementation library.java.junit - testImplementation library.java.powermock - testImplementation library.java.powermock_mockito + testImplementation library.java.mockito_core + testRuntimeOnly library.java.mockito_inline testImplementation library.java.testcontainers_kafka testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java index 6ac819e57057..f483c69d33bf 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; @@ -166,6 +167,11 @@ public void teardown() { } } + @VisibleForTesting + WriteRecords getSpec() { + return spec; + } + private synchronized void checkForFailures() throws IOException { if (numSendFailures == 0) { return; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index 808058a4482c..e31a36a3c331 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaTranslation; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.ByteStringOutputStream; @@ -57,7 +56,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.powermock.reflect.Whitebox; /** Tests for building {@link KafkaIO} externally via the ExpansionService. */ @RunWith(JUnit4.class) @@ -369,9 +367,8 @@ public void testConstructKafkaWrite() throws Exception { RunnerApi.ParDoPayload parDoPayload = RunnerApi.ParDoPayload.parseFrom(writeParDo.getSpec().getPayload()); - DoFn kafkaWriter = ParDoTranslation.getDoFn(parDoPayload); - assertThat(kafkaWriter, Matchers.instanceOf(KafkaWriter.class)); - KafkaIO.WriteRecords spec = Whitebox.getInternalState(kafkaWriter, "spec"); + KafkaWriter kafkaWriter = (KafkaWriter) ParDoTranslation.getDoFn(parDoPayload); + KafkaIO.WriteRecords spec = kafkaWriter.getSpec(); assertThat(spec.getProducerConfig(), Matchers.is(producerConfig)); assertThat(spec.getTopic(), Matchers.is(topic)); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoderTest.java index e0636350974f..6bc3fbaf02ac 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoderTest.java @@ -20,8 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.mockito.Mockito.mockStatic; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -36,12 +35,11 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.Test; import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.junit.runners.JUnit4; +import org.mockito.MockedStatic; /** Tests for {@link ProducerRecordCoder}. */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(ConsumerSpEL.class) +@RunWith(JUnit4.class) public class ProducerRecordCoderTest { @Test public void testCoderIsSerializableWithWellKnownCoderType() { @@ -113,11 +111,13 @@ public void testProducerRecordStructuralValueWithoutHeadersApi() throws IOExcept ProducerRecord producerRecord = new ProducerRecord<>( "topic", 1, null, "key".getBytes(UTF_8), "value".getBytes(UTF_8), headers); - mockStatic(ConsumerSpEL.class); - when(ConsumerSpEL.hasHeaders()).thenReturn(false); - ProducerRecord testProducerRecord = - (ProducerRecord) producerRecordCoder.structuralValue(producerRecord); - assertEquals(testProducerRecord.headers(), new RecordHeaders()); + try (MockedStatic staticMock = mockStatic(ConsumerSpEL.class)) { + staticMock.when(ConsumerSpEL::hasHeaders).thenReturn(false); + ProducerRecord testProducerRecord = + (ProducerRecord) producerRecordCoder.structuralValue(producerRecord); + + assertEquals(testProducerRecord.headers(), new RecordHeaders()); + } } private ProducerRecord verifySerialization(Integer partition, Long timestamp)