Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

## I/Os

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.gcp.options;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import java.util.concurrent.ExecutorService;
import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
Expand All @@ -44,6 +45,15 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline

void setGcsUtil(GcsUtil value);

@JsonIgnore
@Description(
"The GoogleCloudStorageReadOptions instance that should be used to read from Google Cloud Storage.")
@Default.InstanceFactory(GcsUtil.GcsReadOptionsFactory.class)
@Hidden
GoogleCloudStorageReadOptions getGoogleCloudStorageReadOptions();

void setGoogleCloudStorageReadOptions(GoogleCloudStorageReadOptions value);

/**
* The ExecutorService instance to use to create threads, can be overridden to specify an
* ExecutorService that is compatible with the user's environment. If unset, the default is to use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ public static GcsCountersOptions create(
}
}

public static class GcsReadOptionsFactory
implements DefaultValueFactory<GoogleCloudStorageReadOptions> {
@Override
public GoogleCloudStorageReadOptions create(PipelineOptions options) {
return GoogleCloudStorageReadOptions.DEFAULT;
}
}

/**
* This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using any transport
* flags specified on the {@link PipelineOptions}.
Expand Down Expand Up @@ -153,7 +161,8 @@ public GcsUtil create(PipelineOptions options) {
: null,
gcsOptions.getEnableBucketWriteMetricCounter()
? gcsOptions.getGcsWriteCounterPrefix()
: null));
: null),
gcsOptions.getGoogleCloudStorageReadOptions());
}

/** Returns an instance of {@link GcsUtil} based on the given parameters. */
Expand All @@ -164,7 +173,8 @@ public static GcsUtil create(
ExecutorService executorService,
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
GcsCountersOptions gcsCountersOptions) {
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
return new GcsUtil(
storageClient,
httpRequestInitializer,
Expand All @@ -173,7 +183,8 @@ public static GcsUtil create(
credentials,
uploadBufferSizeBytes,
null,
gcsCountersOptions);
gcsCountersOptions,
gcsReadOptions);
}
}

Expand Down Expand Up @@ -249,7 +260,8 @@ public static boolean isWildcard(GcsPath spec) {
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions) {
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
this.storageClient = storageClient;
this.httpRequestInitializer = httpRequestInitializer;
this.uploadBufferSizeBytes = uploadBufferSizeBytes;
Expand All @@ -260,6 +272,7 @@ public static boolean isWildcard(GcsPath spec) {
googleCloudStorageOptions =
GoogleCloudStorageOptions.builder()
.setAppName("Beam")
.setReadChannelOptions(gcsReadOptions)
.setGrpcEnabled(shouldUseGrpc)
.build();
googleCloudStorage =
Expand Down Expand Up @@ -565,7 +578,9 @@ private SeekableByteChannel wrapInCounting(
public SeekableByteChannel open(GcsPath path) throws IOException {
String bucket = path.getBucket();
SeekableByteChannel channel =
googleCloudStorage.open(new StorageResourceId(path.getBucket(), path.getObject()));
googleCloudStorage.open(
new StorageResourceId(path.getBucket(), path.getObject()),
this.googleCloudStorageOptions.getReadChannelOptions());
return wrapInCounting(channel, bucket);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public void testGcpCoreApiSurface() throws Exception {
classesInPackage("com.google.api.services.storage"),
classesInPackage("com.google.auth"),
classesInPackage("com.fasterxml.jackson.annotation"),
classesInPackage("com.google.cloud.hadoop.gcsio"),
Copy link
Collaborator

@shunping shunping Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for tracking that down! Pushed a fix. one of the precommit tests is still failing though:

org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarnessTest > testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers FAILED
    java.lang.AssertionError at GrpcCleanupRule.java:201

Not sure if/how this could be related to my PR

classesInPackage("com.google.common.collect"), // Via gcs-connector ReadOptions builder
classesInPackage("java"),
classesInPackage("javax"),
classesInPackage("org.apache.beam.sdk"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,32 @@ public void testCreationWithGcsUtilProvided() {
assertSame(gcsUtil, pipelineOptions.getGcsUtil());
}

@Test
public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Exception {
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder()
.setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO)
.setSupportGzipEncoding(true)
.setFastFailOnNotFound(false)
.build();

GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
pipelineOptions.setGoogleCloudStorageReadOptions(readOptions);

GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class);
Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(SeekableByteChannel.class));
gcsUtil.setCloudStorageImpl(googleCloudStorageMock);

assertEquals(readOptions, pipelineOptions.getGoogleCloudStorageReadOptions());

// Assert read options are passed to GCS calls
pipelineOptions.getGcsUtil().open(GcsPath.fromUri("gs://bucket/path"));
Mockito.verify(googleCloudStorageMock, Mockito.times(1))
.open(StorageResourceId.fromStringPath("gs://bucket/path"), readOptions);
}

@Test
public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception {
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
Expand Down Expand Up @@ -1630,7 +1656,8 @@ public static GcsUtilMock createMock(PipelineOptions options) {
: null,
gcsOptions.getEnableBucketWriteMetricCounter()
? gcsOptions.getGcsWriteCounterPrefix()
: null));
: null),
gcsOptions.getGoogleCloudStorageReadOptions());
}

private GcsUtilMock(
Expand All @@ -1641,7 +1668,8 @@ private GcsUtilMock(
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions) {
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
super(
storageClient,
httpRequestInitializer,
Expand All @@ -1650,7 +1678,8 @@ private GcsUtilMock(
credentials,
uploadBufferSizeBytes,
rewriteDataOpBatchLimit,
gcsCountersOptions);
gcsCountersOptions,
gcsReadOptions);
}

@Override
Expand Down
Loading