From 00264cdd7ceb4264957fd07897552fdd703dfaa0 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 14 Oct 2024 10:41:36 -0400 Subject: [PATCH 1/8] Propagate gcs-connector options to GcsUtil --- .../google-cloud-platform-core/build.gradle | 4 + .../extensions/gcp/options/GcsOptions.java | 10 +++ .../beam/sdk/extensions/gcp/util/GcsUtil.java | 73 +++++++++++++++++-- .../extensions/gcp/GcpCoreApiSurfaceTest.java | 2 + .../sdk/extensions/gcp/util/GcsUtilTest.java | 62 +++++++++++++++- .../src/test/resources/test-hadoop-conf.xml | 14 ++++ 6 files changed, 157 insertions(+), 8 deletions(-) create mode 100644 sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle b/sdks/java/extensions/google-cloud-platform-core/build.gradle index 8d21df50006b..aff8239cdb9f 100644 --- a/sdks/java/extensions/google-cloud-platform-core/build.gradle +++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle @@ -58,6 +58,10 @@ dependencies { testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation library.java.mockito_core testRuntimeOnly library.java.slf4j_jdk14 + provided library.java.bigdataoss_gcs_connector + provided library.java.hadoop_common + testRuntimeOnly library.java.bigdataoss_gcs_connector + testImplementation library.java.hadoop_common } // Note that no runner is specified here, so tests running under this task should not be running diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 18d637254115..1285b88663e7 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.gcp.options; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import java.util.concurrent.ExecutorService; import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; @@ -44,6 +45,15 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline void setGcsUtil(GcsUtil value); + @JsonIgnore + @Description( + "The GoogleCloudStorageReadOptions instance that should be used to read from Google Cloud Storage.") + @Default.InstanceFactory(GcsUtil.GcsReadOptionsFactory.class) + @Hidden + GoogleCloudStorageReadOptions getGoogleCloudStorageReadOptions(); + + void setGoogleCloudStorageReadOptions(GoogleCloudStorageReadOptions value); + /** * The ExecutorService instance to use to create threads, can be overridden to specify an * ExecutorService that is compatible with the user's environment. If unset, the default is to use diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 8d3596f17b3b..efeb1dce1b68 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -39,6 +39,7 @@ import com.google.api.services.storage.model.StorageObject; import com.google.auth.Credentials; import com.google.auto.value.AutoValue; +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration; import com.google.cloud.hadoop.gcsio.CreateObjectOptions; import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl; @@ -96,6 +97,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors; +import org.apache.hadoop.conf.Configuration; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.slf4j.Logger; @@ -123,6 +125,58 @@ public static GcsCountersOptions create( } } + public static class GcsReadOptionsFactory + implements DefaultValueFactory { + @Override + public GoogleCloudStorageReadOptions create(PipelineOptions options) { + try { + // Check if gcs-connector-hadoop is loaded into classpath + Class.forName("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration"); + Configuration config = new Configuration(); + return GoogleCloudStorageReadOptions.builder() + .setFastFailOnNotFound( + GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE + .get(config, config::getBoolean)) + .setSupportGzipEncoding( + GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE + .get(config, config::getBoolean)) + .setInplaceSeekLimit( + GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get( + config, config::getLong)) + .setFadvise( + GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FADVISE.get( + config, config::getEnum)) + .setMinRangeRequestSize( + GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get( + config, config::getInt)) + .setGrpcChecksumsEnabled( + GoogleHadoopFileSystemConfiguration.GCS_GRPC_CHECKSUMS_ENABLE.get( + config, config::getBoolean)) + .setGrpcReadTimeoutMillis( + GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_TIMEOUT_MS.get( + config, config::getLong)) + .setGrpcReadMessageTimeoutMillis( + GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.get( + config, config::getLong)) + .setGrpcReadMetadataTimeoutMillis( + GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_METADATA_TIMEOUT_MS.get( + config, config::getLong)) + .setGrpcReadZeroCopyEnabled( + GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_ZEROCOPY_ENABLE.get( + config, config::getBoolean)) + .setTraceLogEnabled( + GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_ENABLE.get( + config, config::getBoolean)) + .setTraceLogTimeThreshold( + GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_TIME_THRESHOLD_MS.get( + config, config::getLong)) + .build(); + } catch (ClassNotFoundException e) { + return GoogleCloudStorageReadOptions.DEFAULT; + } + } + } + /** * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using any transport * flags specified on the {@link PipelineOptions}. @@ -153,7 +207,8 @@ public GcsUtil create(PipelineOptions options) { : null, gcsOptions.getEnableBucketWriteMetricCounter() ? gcsOptions.getGcsWriteCounterPrefix() - : null)); + : null), + gcsOptions.getGoogleCloudStorageReadOptions()); } /** Returns an instance of {@link GcsUtil} based on the given parameters. */ @@ -164,7 +219,8 @@ public static GcsUtil create( ExecutorService executorService, Credentials credentials, @Nullable Integer uploadBufferSizeBytes, - GcsCountersOptions gcsCountersOptions) { + GcsCountersOptions gcsCountersOptions, + GoogleCloudStorageReadOptions gcsReadOptions) { return new GcsUtil( storageClient, httpRequestInitializer, @@ -173,7 +229,8 @@ public static GcsUtil create( credentials, uploadBufferSizeBytes, null, - gcsCountersOptions); + gcsCountersOptions, + gcsReadOptions); } } @@ -218,6 +275,7 @@ public static GcsUtil create( private GoogleCloudStorage googleCloudStorage; private GoogleCloudStorageOptions googleCloudStorageOptions; + private GoogleCloudStorageReadOptions googleCloudStorageReadOptions; private final int rewriteDataOpBatchLimit; @@ -249,7 +307,8 @@ public static boolean isWildcard(GcsPath spec) { Credentials credentials, @Nullable Integer uploadBufferSizeBytes, @Nullable Integer rewriteDataOpBatchLimit, - GcsCountersOptions gcsCountersOptions) { + GcsCountersOptions gcsCountersOptions, + GoogleCloudStorageReadOptions gcsReadOptions) { this.storageClient = storageClient; this.httpRequestInitializer = httpRequestInitializer; this.uploadBufferSizeBytes = uploadBufferSizeBytes; @@ -257,9 +316,11 @@ public static boolean isWildcard(GcsPath spec) { this.credentials = credentials; this.maxBytesRewrittenPerCall = null; this.numRewriteTokensUsed = null; + this.googleCloudStorageReadOptions = gcsReadOptions; googleCloudStorageOptions = GoogleCloudStorageOptions.builder() .setAppName("Beam") + .setReadChannelOptions(this.googleCloudStorageReadOptions) .setGrpcEnabled(shouldUseGrpc) .build(); googleCloudStorage = @@ -565,7 +626,9 @@ private SeekableByteChannel wrapInCounting( public SeekableByteChannel open(GcsPath path) throws IOException { String bucket = path.getBucket(); SeekableByteChannel channel = - googleCloudStorage.open(new StorageResourceId(path.getBucket(), path.getObject())); + googleCloudStorage.open( + new StorageResourceId(path.getBucket(), path.getObject()), + this.googleCloudStorageReadOptions); return wrapInCounting(channel, bucket); } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java index f5075a3f2c55..2a1c17f5602f 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java @@ -55,6 +55,8 @@ public void testGcpCoreApiSurface() throws Exception { classesInPackage("com.google.api.services.storage"), classesInPackage("com.google.auth"), classesInPackage("com.fasterxml.jackson.annotation"), + classesInPackage("com.google.cloud.hadoop.gcsio"), + classesInPackage("com.google.common.collect"), // via GoogleCloudStorageReadOptions classesInPackage("java"), classesInPackage("javax"), classesInPackage("org.apache.beam.sdk"), diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java index bd7f46ec8951..d8fafc56cc7b 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java @@ -112,6 +112,7 @@ import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Before; import org.junit.Rule; @@ -177,6 +178,58 @@ public void testCreationWithGcsUtilProvided() { assertSame(gcsUtil, pipelineOptions.getGcsUtil()); } + @Test + public void testCreationWithDefaultGoogleCloudStorageReadOptions() throws Exception { + Configuration.addDefaultResource("test-hadoop-conf.xml"); + GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); + + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class); + Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any())) + .thenReturn(Mockito.mock(SeekableByteChannel.class)); + gcsUtil.setCloudStorageImpl(googleCloudStorageMock); + + GoogleCloudStorageReadOptions expectedOptions = + GoogleCloudStorageReadOptions.builder() + .setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO) + .setSupportGzipEncoding(true) + .setFastFailOnNotFound(false) + .build(); + + assertEquals(expectedOptions, pipelineOptions.getGoogleCloudStorageReadOptions()); + + // Assert read options are passed to GCS calls + pipelineOptions.getGcsUtil().open(GcsPath.fromUri("gs://bucket/path")); + Mockito.verify(googleCloudStorageMock, Mockito.times(1)) + .open(StorageResourceId.fromStringPath("gs://bucket/path"), expectedOptions); + } + + @Test + public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Exception { + GoogleCloudStorageReadOptions readOptions = + GoogleCloudStorageReadOptions.builder() + .setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO) + .setSupportGzipEncoding(true) + .setFastFailOnNotFound(false) + .build(); + + GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); + pipelineOptions.setGoogleCloudStorageReadOptions(readOptions); + + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class); + Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any())) + .thenReturn(Mockito.mock(SeekableByteChannel.class)); + gcsUtil.setCloudStorageImpl(googleCloudStorageMock); + + assertEquals(readOptions, pipelineOptions.getGoogleCloudStorageReadOptions()); + + // Assert read options are passed to GCS calls + pipelineOptions.getGcsUtil().open(GcsPath.fromUri("gs://bucket/path")); + Mockito.verify(googleCloudStorageMock, Mockito.times(1)) + .open(StorageResourceId.fromStringPath("gs://bucket/path"), readOptions); + } + @Test public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception { GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); @@ -1630,7 +1683,8 @@ public static GcsUtilMock createMock(PipelineOptions options) { : null, gcsOptions.getEnableBucketWriteMetricCounter() ? gcsOptions.getGcsWriteCounterPrefix() - : null)); + : null), + gcsOptions.getGoogleCloudStorageReadOptions()); } private GcsUtilMock( @@ -1641,7 +1695,8 @@ private GcsUtilMock( Credentials credentials, @Nullable Integer uploadBufferSizeBytes, @Nullable Integer rewriteDataOpBatchLimit, - GcsCountersOptions gcsCountersOptions) { + GcsCountersOptions gcsCountersOptions, + GoogleCloudStorageReadOptions gcsReadOptions) { super( storageClient, httpRequestInitializer, @@ -1650,7 +1705,8 @@ private GcsUtilMock( credentials, uploadBufferSizeBytes, rewriteDataOpBatchLimit, - gcsCountersOptions); + gcsCountersOptions, + gcsReadOptions); } @Override diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml b/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml new file mode 100644 index 000000000000..2586eabb3a3e --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml @@ -0,0 +1,14 @@ + + + fs.gs.inputstream.fast.fail.on.not.found.enable + false + + + fs.gs.inputstream.support.gzip.encoding.enable + true + + + fs.gs.inputstream.fadvise + AUTO + + \ No newline at end of file From fe24a1ef7db78cf7c2d1c802904197263ee8a47f Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 14 Oct 2024 10:55:01 -0400 Subject: [PATCH 2/8] newline --- .../src/test/resources/test-hadoop-conf.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml b/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml index 2586eabb3a3e..9bb55c8d678d 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml @@ -11,4 +11,4 @@ fs.gs.inputstream.fadvise AUTO - \ No newline at end of file + From ed811a3d0fd7d6bc4a97f407ade89badcbfa4595 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 15 Oct 2024 10:10:09 -0400 Subject: [PATCH 3/8] Update CHANGES.md --- CHANGES.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 979cbbd67329..b6e9ac833c85 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,10 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686)) +* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) +* gcs-connector config options can be set via GcsOptions; otherwise will be automatically loaded from default Configuration (Java) + ## New Features / Improvements From 661a82606b971ec12beebcea5f9043ed2b3847b5 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 25 Nov 2024 11:31:49 -0500 Subject: [PATCH 4/8] Remove Hadoop dependency --- CHANGES.md | 5 +- .../google-cloud-platform-core/build.gradle | 2 - .../beam/sdk/extensions/gcp/util/GcsUtil.java | 48 +------------------ .../extensions/gcp/GcpCoreApiSurfaceTest.java | 1 - .../sdk/extensions/gcp/util/GcsUtilTest.java | 27 ----------- .../src/test/resources/test-hadoop-conf.xml | 14 ------ 6 files changed, 2 insertions(+), 95 deletions(-) delete mode 100644 sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml diff --git a/CHANGES.md b/CHANGES.md index b6e9ac833c85..01b81bc5d6a6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,10 +63,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686)) -* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) -* gcs-connector config options can be set via GcsOptions; otherwise will be automatically loaded from default Configuration (Java) - +* gcs-connector config options can be set via GcsOptions (Java) ## New Features / Improvements diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle b/sdks/java/extensions/google-cloud-platform-core/build.gradle index aff8239cdb9f..493bb6b05018 100644 --- a/sdks/java/extensions/google-cloud-platform-core/build.gradle +++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle @@ -59,9 +59,7 @@ dependencies { testImplementation library.java.mockito_core testRuntimeOnly library.java.slf4j_jdk14 provided library.java.bigdataoss_gcs_connector - provided library.java.hadoop_common testRuntimeOnly library.java.bigdataoss_gcs_connector - testImplementation library.java.hadoop_common } // Note that no runner is specified here, so tests running under this task should not be running diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index efeb1dce1b68..5f3a7a89e8e6 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -39,7 +39,6 @@ import com.google.api.services.storage.model.StorageObject; import com.google.auth.Credentials; import com.google.auto.value.AutoValue; -import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration; import com.google.cloud.hadoop.gcsio.CreateObjectOptions; import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl; @@ -97,7 +96,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors; -import org.apache.hadoop.conf.Configuration; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.slf4j.Logger; @@ -129,51 +127,7 @@ public static class GcsReadOptionsFactory implements DefaultValueFactory { @Override public GoogleCloudStorageReadOptions create(PipelineOptions options) { - try { - // Check if gcs-connector-hadoop is loaded into classpath - Class.forName("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration"); - Configuration config = new Configuration(); - return GoogleCloudStorageReadOptions.builder() - .setFastFailOnNotFound( - GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE - .get(config, config::getBoolean)) - .setSupportGzipEncoding( - GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE - .get(config, config::getBoolean)) - .setInplaceSeekLimit( - GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get( - config, config::getLong)) - .setFadvise( - GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FADVISE.get( - config, config::getEnum)) - .setMinRangeRequestSize( - GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get( - config, config::getInt)) - .setGrpcChecksumsEnabled( - GoogleHadoopFileSystemConfiguration.GCS_GRPC_CHECKSUMS_ENABLE.get( - config, config::getBoolean)) - .setGrpcReadTimeoutMillis( - GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_TIMEOUT_MS.get( - config, config::getLong)) - .setGrpcReadMessageTimeoutMillis( - GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.get( - config, config::getLong)) - .setGrpcReadMetadataTimeoutMillis( - GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_METADATA_TIMEOUT_MS.get( - config, config::getLong)) - .setGrpcReadZeroCopyEnabled( - GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_ZEROCOPY_ENABLE.get( - config, config::getBoolean)) - .setTraceLogEnabled( - GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_ENABLE.get( - config, config::getBoolean)) - .setTraceLogTimeThreshold( - GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_TIME_THRESHOLD_MS.get( - config, config::getLong)) - .build(); - } catch (ClassNotFoundException e) { - return GoogleCloudStorageReadOptions.DEFAULT; - } + return GoogleCloudStorageReadOptions.DEFAULT; } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java index 2a1c17f5602f..9415d566b8fd 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java @@ -56,7 +56,6 @@ public void testGcpCoreApiSurface() throws Exception { classesInPackage("com.google.auth"), classesInPackage("com.fasterxml.jackson.annotation"), classesInPackage("com.google.cloud.hadoop.gcsio"), - classesInPackage("com.google.common.collect"), // via GoogleCloudStorageReadOptions classesInPackage("java"), classesInPackage("javax"), classesInPackage("org.apache.beam.sdk"), diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java index d8fafc56cc7b..97082572ce41 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java @@ -112,7 +112,6 @@ import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Before; import org.junit.Rule; @@ -178,32 +177,6 @@ public void testCreationWithGcsUtilProvided() { assertSame(gcsUtil, pipelineOptions.getGcsUtil()); } - @Test - public void testCreationWithDefaultGoogleCloudStorageReadOptions() throws Exception { - Configuration.addDefaultResource("test-hadoop-conf.xml"); - GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); - - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class); - Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any())) - .thenReturn(Mockito.mock(SeekableByteChannel.class)); - gcsUtil.setCloudStorageImpl(googleCloudStorageMock); - - GoogleCloudStorageReadOptions expectedOptions = - GoogleCloudStorageReadOptions.builder() - .setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO) - .setSupportGzipEncoding(true) - .setFastFailOnNotFound(false) - .build(); - - assertEquals(expectedOptions, pipelineOptions.getGoogleCloudStorageReadOptions()); - - // Assert read options are passed to GCS calls - pipelineOptions.getGcsUtil().open(GcsPath.fromUri("gs://bucket/path")); - Mockito.verify(googleCloudStorageMock, Mockito.times(1)) - .open(StorageResourceId.fromStringPath("gs://bucket/path"), expectedOptions); - } - @Test public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Exception { GoogleCloudStorageReadOptions readOptions = diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml b/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml deleted file mode 100644 index 9bb55c8d678d..000000000000 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - fs.gs.inputstream.fast.fail.on.not.found.enable - false - - - fs.gs.inputstream.support.gzip.encoding.enable - true - - - fs.gs.inputstream.fadvise - AUTO - - From 3a1b9a8206cf0e8c92b71c92b27d4cc79b126129 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 25 Nov 2024 12:56:41 -0500 Subject: [PATCH 5/8] Remove unused deps --- sdks/java/extensions/google-cloud-platform-core/build.gradle | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle b/sdks/java/extensions/google-cloud-platform-core/build.gradle index 493bb6b05018..8d21df50006b 100644 --- a/sdks/java/extensions/google-cloud-platform-core/build.gradle +++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle @@ -58,8 +58,6 @@ dependencies { testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation library.java.mockito_core testRuntimeOnly library.java.slf4j_jdk14 - provided library.java.bigdataoss_gcs_connector - testRuntimeOnly library.java.bigdataoss_gcs_connector } // Note that no runner is specified here, so tests running under this task should not be running From 2432dba28bcd71f06f54bbe3cef368315b97d989 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 25 Nov 2024 13:01:38 -0500 Subject: [PATCH 6/8] Drop googleCloudStorageReadOptions member variable --- .../org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 5f3a7a89e8e6..d58154132a72 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -229,7 +229,6 @@ public static GcsUtil create( private GoogleCloudStorage googleCloudStorage; private GoogleCloudStorageOptions googleCloudStorageOptions; - private GoogleCloudStorageReadOptions googleCloudStorageReadOptions; private final int rewriteDataOpBatchLimit; @@ -270,11 +269,10 @@ public static boolean isWildcard(GcsPath spec) { this.credentials = credentials; this.maxBytesRewrittenPerCall = null; this.numRewriteTokensUsed = null; - this.googleCloudStorageReadOptions = gcsReadOptions; googleCloudStorageOptions = GoogleCloudStorageOptions.builder() .setAppName("Beam") - .setReadChannelOptions(this.googleCloudStorageReadOptions) + .setReadChannelOptions(gcsReadOptions) .setGrpcEnabled(shouldUseGrpc) .build(); googleCloudStorage = @@ -582,7 +580,7 @@ public SeekableByteChannel open(GcsPath path) throws IOException { SeekableByteChannel channel = googleCloudStorage.open( new StorageResourceId(path.getBucket(), path.getObject()), - this.googleCloudStorageReadOptions); + this.googleCloudStorageOptions.getReadChannelOptions()); return wrapInCounting(channel, bucket); } From efb4035ed0c8eef5dc1f52886585bc699c4aecbf Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 26 Nov 2024 22:39:26 -0500 Subject: [PATCH 7/8] add missing package --- .../apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java index 9415d566b8fd..26d98125a3af 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java @@ -56,6 +56,7 @@ public void testGcpCoreApiSurface() throws Exception { classesInPackage("com.google.auth"), classesInPackage("com.fasterxml.jackson.annotation"), classesInPackage("com.google.cloud.hadoop.gcsio"), + classesInPackage("com.google.common.collect"), // Via gcs-connector ReadOptions builder classesInPackage("java"), classesInPackage("javax"), classesInPackage("org.apache.beam.sdk"), From 27672abb10c9878870694714c916aa9919fee304 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 3 Dec 2024 09:53:23 -0500 Subject: [PATCH 8/8] fixup CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 948b903bb162..fc32398a7a5a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ ## I/Os * gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)). +* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## New Features / Improvements