diff --git a/CHANGES.md b/CHANGES.md index 44f5fe88c4dc..2c00cd02e7d1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -62,6 +62,7 @@ ## I/Os +* Support gcs-connector 3.x+ in GcsUtil ([#33368](https://github.com/apache/beam/pull/33368)) * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## New Features / Improvements diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index d58154132a72..caa59c87b5dd 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -30,6 +30,7 @@ import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpStatusCodes; +import com.google.api.client.http.HttpTransport; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.storage.Storage; @@ -52,6 +53,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.AccessDeniedException; @@ -739,7 +741,47 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO GoogleCloudStorage createGoogleCloudStorage( GoogleCloudStorageOptions options, Storage storage, Credentials credentials) { - return new GoogleCloudStorageImpl(options, storage, credentials); + try { + return new GoogleCloudStorageImpl(options, storage, credentials); + } catch (NoSuchMethodError e) { + // gcs-connector 3.x drops the direct constructor and exclusively uses Builder + // TODO eliminate reflection once Beam drops Java 8 support and upgrades to gcsio 3.x + try { + final Method builderMethod = GoogleCloudStorageImpl.class.getMethod("builder"); + Object builder = builderMethod.invoke(null); + final Class builderClass = + Class.forName( + "com.google.cloud.hadoop.gcsio.AutoBuilder_GoogleCloudStorageImpl_Builder"); + + final Method setOptionsMethod = + builderClass.getMethod("setOptions", GoogleCloudStorageOptions.class); + setOptionsMethod.setAccessible(true); + builder = setOptionsMethod.invoke(builder, options); + + final Method setHttpTransportMethod = + builderClass.getMethod("setHttpTransport", HttpTransport.class); + setHttpTransportMethod.setAccessible(true); + builder = + setHttpTransportMethod.invoke(builder, storage.getRequestFactory().getTransport()); + + final Method setCredentialsMethod = + builderClass.getMethod("setCredentials", Credentials.class); + setCredentialsMethod.setAccessible(true); + builder = setCredentialsMethod.invoke(builder, credentials); + + final Method setHttpRequestInitializerMethod = + builderClass.getMethod("setHttpRequestInitializer", HttpRequestInitializer.class); + setHttpRequestInitializerMethod.setAccessible(true); + builder = setHttpRequestInitializerMethod.invoke(builder, httpRequestInitializer); + + final Method buildMethod = builderClass.getMethod("build"); + buildMethod.setAccessible(true); + return (GoogleCloudStorage) buildMethod.invoke(builder); + } catch (Exception reflectionError) { + throw new RuntimeException( + "Failed to construct GoogleCloudStorageImpl from gcsio 3.x Builder", reflectionError); + } + } } /**