Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation;
import com.google.cloud.dataflow.sdk.io.Sink.Writer;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.GcpOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
Expand Down Expand Up @@ -307,18 +306,7 @@ public List<Source> splitIntoBundles(long desiredBundleSizeBytes, PipelineOption
numSplits = Math.round(((double) getEstimatedSizeBytes(options)) / desiredBundleSizeBytes);
} catch (Exception e) {
// Fallback in case estimated size is unavailable. TODO: fix this, it's horrible.

// 1. Try Dataflow's numWorkers, which will be 0 for other workers.
DataflowPipelineWorkerPoolOptions poolOptions =
options.as(DataflowPipelineWorkerPoolOptions.class);
if (poolOptions.getNumWorkers() > 0) {
LOG.warn("Estimated size of unavailable, using the number of workers {}",
poolOptions.getNumWorkers(), e);
numSplits = poolOptions.getNumWorkers();
} else {
// 2. Default to 12 in the unknown case.
numSplits = 12;
}
numSplits = 12;
}

// If the desiredBundleSize or number of workers results in 1 split, simply return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.options.PubsubOptions;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.Create;
Expand Down Expand Up @@ -399,10 +398,9 @@ public String asPath() {
* the stream.
*
* <p>When running with a {@link PipelineRunner} that only supports bounded
* {@link PCollection PCollections} (such as {@link DirectPipelineRunner} or
* {@link DataflowPipelineRunner} without {@code --streaming}), only a bounded portion of the
* input Pub/Sub stream can be processed. As such, either {@link Bound#maxNumRecords(int)} or
* {@link Bound#maxReadTime(Duration)} must be set.
* {@link PCollection PCollections} (such as {@link DirectPipelineRunner}),
* only a bounded portion of the input Pub/Sub stream can be processed. As such, either
* {@link Bound#maxNumRecords(int)} or {@link Bound#maxReadTime(Duration)} must be set.
*/
public static class Read {
/**
Expand Down Expand Up @@ -728,7 +726,7 @@ private class PubsubReader extends DoFn<Void, T> {
@Override
public void processElement(ProcessContext c) throws IOException {
Pubsub pubsubClient =
Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class))
Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class))
.build();

String subscription;
Expand Down Expand Up @@ -1004,7 +1002,7 @@ private class PubsubWriter extends DoFn<T, Void> {
public void startBundle(Context c) {
this.output = new ArrayList<>();
this.pubsubClient =
Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class))
Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
import com.google.cloud.dataflow.sdk.io.Read.Bounded;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
Expand Down Expand Up @@ -113,11 +112,10 @@
* <h3>Permissions</h3>
* <p>When run using the {@link DirectPipelineRunner}, your pipeline can read and write text files
* on your local drive and remote text files on Google Cloud Storage that you have access to using
* your {@code gcloud} credentials. When running in the Dataflow service using
* {@link DataflowPipelineRunner}, the pipeline can only read and write files from GCS. For more
* information about permissions, see the Cloud Dataflow documentation on
* <a href="https://cloud.google.com/dataflow/security-and-permissions">Security and
* Permissions</a>.
* your {@code gcloud} credentials. When running in the Dataflow service, the pipeline can only
* read and write files from GCS. For more information about permissions, see the Cloud Dataflow
* documentation on <a href="https://cloud.google.com/dataflow/security-and-permissions">Security
* and Permissions</a>.
*/
public class TextIO {
/** The default coder, which returns each line of the input file as a string. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import com.google.api.services.dataflow.Dataflow;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.util.DataflowPathValidator;
import com.google.cloud.dataflow.sdk.util.DataflowTransport;
import com.google.cloud.dataflow.sdk.util.GcsStager;
import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
import com.google.cloud.dataflow.sdk.util.PathValidator;
import com.google.cloud.dataflow.sdk.util.Stager;
import com.google.cloud.dataflow.sdk.util.Transport;

import com.fasterxml.jackson.annotation.JsonIgnore;

Expand Down Expand Up @@ -159,18 +159,11 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> {
@Override
public Dataflow create(PipelineOptions options) {
return Transport.newDataflowClient(options.as(DataflowPipelineOptions.class)).build();
return DataflowTransport.newDataflowClient(
options.as(DataflowPipelineOptions.class)).build();
}
}

/**
* Root URL for use with the Pubsub API.
*/
@Description("Root URL for use with the Pubsub API")
@Default.String("https://pubsub.googleapis.com")
String getPubsubRootUrl();
void setPubsubRootUrl(String value);

/**
* Whether to update the currently running pipeline with the same name as this one.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface DataflowPipelineOptions extends
PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
DataflowPipelineWorkerPoolOptions, BigQueryOptions,
GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
DataflowProfilingOptions {
DataflowProfilingOptions, PubsubOptions {

@Description("Project id. Required when running a Dataflow in the cloud. "
+ "See https://cloud.google.com/storage/docs/projects for further details.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.dataflow.sdk.options;

/**
* Properties that can be set when using Pubsub with the Beam SDK.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apache header as of #100

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Description("Options that are used to configure BigQuery. See "
+ "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

description is off

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public interface PubsubOptions extends ApplicationNameOptions, GcpOptions,
PipelineOptions, StreamingOptions {

/**
* Root URL for use with the Pubsub API.
*/
@Description("Root URL for use with the Pubsub API")
@Default.String("https://pubsub.googleapis.com")
@Hidden
String getPubsubRootUrl();
void setPubsubRootUrl(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo;
import com.google.cloud.dataflow.sdk.util.DataflowTransport;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
Expand All @@ -107,7 +108,6 @@
import com.google.cloud.dataflow.sdk.util.PropertyNames;
import com.google.cloud.dataflow.sdk.util.Reshuffle;
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
Expand Down Expand Up @@ -444,7 +444,7 @@ private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniqu
throw new RuntimeException("Should not specify the debuggee");
}

Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build();
Clouddebugger debuggerClient = DataflowTransport.newClouddebuggerClient(options).build();
Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier);
options.setDebuggee(debuggee);

Expand Down Expand Up @@ -600,7 +600,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
// regularly and need not be retried automatically.
DataflowPipelineJob dataflowPipelineJob =
new DataflowPipelineJob(options.getProject(), jobResult.getId(),
Transport.newRawDataflowClient(options).build(), aggregatorTransforms);
DataflowTransport.newRawDataflowClient(options).build(), aggregatorTransforms);

// If the service returned client request id, the SDK needs to compare it
// with the original id generated in the request, if they are not the same
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.dataflow.sdk.util;

import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory;
import static com.google.cloud.dataflow.sdk.util.Transport.getTransport;

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.dataflow.Dataflow;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.common.collect.ImmutableList;

import java.net.MalformedURLException;
import java.net.URL;

/**
* Helpers for cloud communication.
*/
public class DataflowTransport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dataflow or Gcp?

I assume these are used outside of Dataflow Service, e.g., PubSubIO on Flink.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see you only moved Dataflow and Clouddebugger. Why only these?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind. resolved.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I left all the GCP IO transports in Transport.java and moved out to the two clients which we use only with Dataflow. In the future, cloud debugger could move out to be generic for others as well.



private static class ApiComponents {
public String rootUrl;
public String servicePath;

public ApiComponents(String root, String path) {
this.rootUrl = root;
this.servicePath = path;
}
}

private static ApiComponents apiComponentsFromUrl(String urlString) {
try {
URL url = new URL(urlString);
String rootUrl = url.getProtocol() + "://" + url.getHost() +
(url.getPort() > 0 ? ":" + url.getPort() : "");
return new ApiComponents(rootUrl, url.getPath());
} catch (MalformedURLException e) {
throw new RuntimeException("Invalid URL: " + urlString);
}
}

/**
* Returns a Google Cloud Dataflow client builder.
*/
public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) {
String servicePath = options.getDataflowEndpoint();
ApiComponents components;
if (servicePath.contains("://")) {
components = apiComponentsFromUrl(servicePath);
} else {
components = new ApiComponents(options.getApiRootUrl(), servicePath);
}

return new Dataflow.Builder(getTransport(),
getJsonFactory(),
chainHttpRequestInitializer(
options.getGcpCredential(),
// Do not log 404. It clutters the output and is possibly even required by the caller.
new RetryHttpRequestInitializer(ImmutableList.of(404))))
.setApplicationName(options.getAppName())
.setRootUrl(components.rootUrl)
.setServicePath(components.servicePath)
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
}

public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
return new Clouddebugger.Builder(getTransport(),
getJsonFactory(),
chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
}

/**
* Returns a Dataflow client that does not automatically retry failed
* requests.
*/
public static Dataflow.Builder
newRawDataflowClient(DataflowPipelineOptions options) {
return newDataflowClient(options)
.setHttpRequestInitializer(options.getGcpCredential())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
}

private static HttpRequestInitializer chainHttpRequestInitializer(
Credential credential, HttpRequestInitializer httpRequestInitializer) {
if (credential == null) {
return httpRequestInitializer;
} else {
return new ChainingHttpRequestInitializer(credential, httpRequestInitializer);
}
}
}
Loading