Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
<avro.version>1.7.7</avro.version>
<bigquery.version>v2-rev248-1.21.0</bigquery.version>
<bigtable.version>0.2.3</bigtable.version>
<clouddebugger.version>v2-rev6-1.21.0</clouddebugger.version>
<dataflow.version>v1b3-rev22-1.21.0</dataflow.version>
<dataflow.proto.version>0.5.160222</dataflow.proto.version>
<datastore.version>v1beta2-rev1-4.0.0</datastore.version>
Expand Down
14 changes: 14 additions & 0 deletions sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,20 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-clouddebugger</artifactId>
<version>${clouddebugger.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.services.datastore.DatastoreV1;
import com.google.api.services.datastore.DatastoreV1.Entity;
import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package com.google.cloud.dataflow.sdk.options;

import com.google.api.services.clouddebugger.v2.model.Debuggee;
import com.google.cloud.dataflow.sdk.annotations.Experimental;

import javax.annotation.Nullable;

/**
* Options for controlling Cloud Debugger.
*/
Expand All @@ -32,5 +35,9 @@ public interface CloudDebuggerOptions {
@Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
boolean getEnableCloudDebugger();
void setEnableCloudDebugger(boolean enabled);
}

@Description("The Cloud Debugger debugee to associate with. This should not be set directly.")
@Hidden
@Nullable Debuggee getDebuggee();
void setDebuggee(Debuggee debuggee);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.clouddebugger.v2.model.Debuggee;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
Expand Down Expand Up @@ -420,6 +424,43 @@ private <T> PCollection<T> applyWindow(
return super.apply(new AssignWindows<>(transform), input);
}

private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) {
if (!options.getEnableCloudDebugger()) {
return;
}

if (options.getDebuggee() != null) {
throw new RuntimeException("Should not specify the debuggee");
}

Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build();
Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier);
options.setDebuggee(debuggee);
}

private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifier) {
RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest();
registerReq.setDebuggee(new Debuggee()
.setProject(options.getProject())
.setUniquifier(uniquifier)
.setDescription(uniquifier)
.setAgentVersion("google.com/cloud-dataflow-java/v1"));

try {
RegisterDebuggeeResponse registerResponse =
debuggerClient.controller().debuggees().register(registerReq).execute();
Debuggee debuggee = registerResponse.getDebuggee();
if (debuggee.getStatus() != null && debuggee.getStatus().getIsError()) {
throw new RuntimeException("Unable to register with the debugger: " +
debuggee.getStatus().getDescription().getFormat());
}

return debuggee;
} catch (IOException e) {
throw new RuntimeException("Unable to register with the debugger: ", e);
}
}

@Override
public DataflowPipelineJob run(Pipeline pipeline) {
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
Expand All @@ -428,9 +469,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
+ "related to Google Compute Engine usage and other Google Cloud Services.");

List<DataflowPackage> packages = options.getStager().stageFiles();
JobSpecification jobSpecification =
translator.translate(pipeline, this, packages);
Job newJob = jobSpecification.getJob();


// Set a unique client_request_id in the CreateJob request.
// This is used to ensure idempotence of job creation across retried
Expand All @@ -442,6 +481,15 @@ public DataflowPipelineJob run(Pipeline pipeline) {
int randomNum = new Random().nextInt(9000) + 1000;
String requestId = DateTimeFormat.forPattern("YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC)
.print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum;

// Try to create a debuggee ID. This must happen before the job is translated since it may
// update the options.
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
maybeRegisterDebuggee(dataflowOptions, requestId);

JobSpecification jobSpecification =
translator.translate(pipeline, this, packages);
Job newJob = jobSpecification.getJob();
newJob.setClientRequestId(requestId);

String version = DataflowReleaseInfo.getReleaseInfo().getVersion();
Expand All @@ -450,7 +498,6 @@ public DataflowPipelineJob run(Pipeline pipeline) {
newJob.getEnvironment().setUserAgent(DataflowReleaseInfo.getReleaseInfo());
// The Dataflow Service may write to the temporary directory directly, so
// must be verified.
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
if (!Strings.isNullOrEmpty(options.getTempLocation())) {
newJob.getEnvironment().setTempStoragePrefix(
dataflowOptions.getPathValidator().verifyPath(options.getTempLocation()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.storage.Storage;
Expand Down Expand Up @@ -148,6 +149,14 @@ public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
}

public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
return new Clouddebugger.Builder(getTransport(),
getJsonFactory(),
chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
}

/**
* Returns a Dataflow client that does not automatically retry failed
* requests.
Expand Down