diff --git a/dataproc/pom.xml b/dataproc/pom.xml
index 87993bcc842..240b527bc1b 100644
--- a/dataproc/pom.xml
+++ b/dataproc/pom.xml
@@ -36,7 +36,27 @@
+
+
+
+ com.google.cloud
+ libraries-bom
+ 3.3.0
+ pom
+ import
+
+
+
+
+
+ com.google.cloud
+ google-cloud-dataproc
+
+
+ com.google.cloud
+ google-cloud-storage
+
junit
@@ -44,11 +64,6 @@
4.12
test
-
- com.google.cloud
- google-cloud-dataproc
- 0.118.0
-
\ No newline at end of file
diff --git a/dataproc/src/main/java/CreateCluster.java b/dataproc/src/main/java/CreateCluster.java
index f68852e8e2e..0815a35ffe0 100644
--- a/dataproc/src/main/java/CreateCluster.java
+++ b/dataproc/src/main/java/CreateCluster.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2019 Google Inc.
+ * Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,57 +27,58 @@
public class CreateCluster {
+ public static void createCluster() throws IOException, InterruptedException {
+ // TODO(developer): Replace these variables before running the sample.
+ String projectId = "your-project-id";
+ String region = "your-project-region";
+ String clusterName = "your-cluster-name";
+ createCluster(projectId, region, clusterName);
+ }
+
public static void createCluster(String projectId, String region, String clusterName)
throws IOException, InterruptedException {
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
- // Configure the settings for the cluster controller client
+ // Configure the settings for the cluster controller client.
ClusterControllerSettings clusterControllerSettings =
ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
- // Create a cluster controller client with the configured settings. We only need to create
- // the client once, and can be reused for multiple requests. Using a try-with-resources
- // will close the client for us, but this can also be done manually with the .close() method.
- try (ClusterControllerClient clusterControllerClient = ClusterControllerClient
- .create(clusterControllerSettings)) {
- // Configure the settings for our cluster
- InstanceGroupConfig masterConfig = InstanceGroupConfig.newBuilder()
- .setMachineTypeUri("n1-standard-1")
- .setNumInstances(1)
- .build();
- InstanceGroupConfig workerConfig = InstanceGroupConfig.newBuilder()
- .setMachineTypeUri("n1-standard-1")
- .setNumInstances(2)
- .build();
- ClusterConfig clusterConfig = ClusterConfig.newBuilder()
- .setMasterConfig(masterConfig)
- .setWorkerConfig(workerConfig)
- .build();
- // Create the cluster object with the desired cluster config
- Cluster cluster = Cluster.newBuilder()
- .setClusterName(clusterName)
- .setConfig(clusterConfig)
- .build();
+ // Create a cluster controller client with the configured settings. The client only needs to be
+ // created once and can be reused for multiple requests. Using a try-with-resources
+ // closes the client, but this can also be done manually with the .close() method.
+ try (ClusterControllerClient clusterControllerClient =
+ ClusterControllerClient.create(clusterControllerSettings)) {
+ // Configure the settings for our cluster.
+ InstanceGroupConfig masterConfig =
+ InstanceGroupConfig.newBuilder()
+ .setMachineTypeUri("n1-standard-1")
+ .setNumInstances(1)
+ .build();
+ InstanceGroupConfig workerConfig =
+ InstanceGroupConfig.newBuilder()
+ .setMachineTypeUri("n1-standard-1")
+ .setNumInstances(2)
+ .build();
+ ClusterConfig clusterConfig =
+ ClusterConfig.newBuilder()
+ .setMasterConfig(masterConfig)
+ .setWorkerConfig(workerConfig)
+ .build();
+ // Create the cluster object with the desired cluster config.
+ Cluster cluster =
+ Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build();
- // Create the Cloud Dataproc cluster
+ // Create the Cloud Dataproc cluster.
OperationFuture createClusterAsyncRequest =
clusterControllerClient.createClusterAsync(projectId, region, cluster);
Cluster response = createClusterAsyncRequest.get();
- // Print out a success message
- System.out.println(
- String.format("Cluster created successfully: %s", response.getClusterName())
- );
+ // Print out a success message.
+ System.out.printf("Cluster created successfully: %s", response.getClusterName());
- } catch (IOException e) {
- // Likely this would occur due to issues authenticating with GCP. Make sure the environment
- // variable GOOGLE_APPLICATION_CREDENTIALS is configured.
- System.out.println("Error creating the cluster controller client: \n" + e.toString());
} catch (ExecutionException e) {
- // Common issues for this include needing to increase compute engine quotas or a cluster of
- // the same name already exists.
- System.out.println("Error during cluster creation request: \n" + e.toString());
+ System.err.println(String.format("Error executing createCluster: %s ", e.getMessage()));
}
}
}
-// [END dataproc_create_cluster]
\ No newline at end of file
+// [END dataproc_create_cluster]
diff --git a/dataproc/src/main/java/Quickstart.java b/dataproc/src/main/java/Quickstart.java
new file mode 100644
index 00000000000..0a0d177b1d0
--- /dev/null
+++ b/dataproc/src/main/java/Quickstart.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2019 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// [START dataproc_quickstart]
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.cloud.dataproc.v1.Cluster;
+import com.google.cloud.dataproc.v1.ClusterConfig;
+import com.google.cloud.dataproc.v1.ClusterControllerClient;
+import com.google.cloud.dataproc.v1.ClusterControllerSettings;
+import com.google.cloud.dataproc.v1.ClusterOperationMetadata;
+import com.google.cloud.dataproc.v1.InstanceGroupConfig;
+import com.google.cloud.dataproc.v1.Job;
+import com.google.cloud.dataproc.v1.JobControllerClient;
+import com.google.cloud.dataproc.v1.JobControllerSettings;
+import com.google.cloud.dataproc.v1.JobPlacement;
+import com.google.cloud.dataproc.v1.PySparkJob;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import com.google.protobuf.Empty;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class Quickstart {
+
+ public static Job waitForJobCompletion(
+ JobControllerClient jobControllerClient, String projectId, String region, String jobId) {
+ while (true) {
+ // Poll the service periodically until the Job is in a finished state.
+ Job jobInfo = jobControllerClient.getJob(projectId, region, jobId);
+ switch (jobInfo.getStatus().getState()) {
+ case DONE:
+ case CANCELLED:
+ case ERROR:
+ return jobInfo;
+ default:
+ try {
+ // Wait a second in between polling attempts.
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ public static void quickstart() throws IOException, InterruptedException {
+ // TODO(developer): Replace these variables before running the sample.
+ String projectId = "your-project-id";
+ String region = "your-project-region";
+ String clusterName = "your-cluster-name";
+ String jobFilePath = "your-job-file-path";
+ quickstart(projectId, region, clusterName, jobFilePath);
+ }
+
+ public static void quickstart(
+ String projectId, String region, String clusterName, String jobFilePath)
+ throws IOException, InterruptedException {
+ String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
+
+ // Configure the settings for the cluster controller client.
+ ClusterControllerSettings clusterControllerSettings =
+ ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
+
+ // Configure the settings for the job controller client.
+ JobControllerSettings jobControllerSettings =
+ JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
+
+ // Create both a cluster controller client and job controller client with the configured
+ // settings. The client only needs to be created once and can be reused for multiple requests.
+ // Using a try-with-resources closes the client, but this can also be done manually with
+ // the .close() method.
+ try (ClusterControllerClient clusterControllerClient =
+ ClusterControllerClient.create(clusterControllerSettings);
+ JobControllerClient jobControllerClient =
+ JobControllerClient.create(jobControllerSettings)) {
+ // Configure the settings for our cluster.
+ InstanceGroupConfig masterConfig =
+ InstanceGroupConfig.newBuilder()
+ .setMachineTypeUri("n1-standard-1")
+ .setNumInstances(1)
+ .build();
+ InstanceGroupConfig workerConfig =
+ InstanceGroupConfig.newBuilder()
+ .setMachineTypeUri("n1-standard-1")
+ .setNumInstances(2)
+ .build();
+ ClusterConfig clusterConfig =
+ ClusterConfig.newBuilder()
+ .setMasterConfig(masterConfig)
+ .setWorkerConfig(workerConfig)
+ .build();
+ // Create the cluster object with the desired cluster config.
+ Cluster cluster =
+ Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build();
+
+ // Create the Cloud Dataproc cluster.
+ OperationFuture createClusterAsyncRequest =
+ clusterControllerClient.createClusterAsync(projectId, region, cluster);
+ Cluster response = createClusterAsyncRequest.get();
+ System.out.printf("Cluster created successfully: %s", response.getClusterName());
+
+ // Configure the settings for our job.
+ JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
+ PySparkJob pySparkJob = PySparkJob.newBuilder().setMainPythonFileUri(jobFilePath).build();
+ Job job = Job.newBuilder().setPlacement(jobPlacement).setPysparkJob(pySparkJob).build();
+
+ // Submit an asynchronous request to execute the job.
+ Job request = jobControllerClient.submitJob(projectId, region, job);
+ String jobId = request.getReference().getJobId();
+ System.out.println(String.format("Submitted job \"%s\"", jobId));
+
+ // Wait for the job to finish.
+ CompletableFuture finishedJobFuture =
+ CompletableFuture.supplyAsync(
+ () -> waitForJobCompletion(jobControllerClient, projectId, region, jobId));
+ int timeout = 10;
+ try {
+ Job jobInfo = finishedJobFuture.get(timeout, TimeUnit.MINUTES);
+ System.out.printf("Job %s finished successfully.", jobId);
+
+ // Cloud Dataproc job output gets saved to a GCS bucket allocated to it.
+ Cluster clusterInfo = clusterControllerClient.getCluster(projectId, region, clusterName);
+ Storage storage = StorageOptions.getDefaultInstance().getService();
+ Blob blob =
+ storage.get(
+ clusterInfo.getConfig().getConfigBucket(),
+ String.format(
+ "google-cloud-dataproc-metainfo/%s/jobs/%s/driveroutput.000000000",
+ clusterInfo.getClusterUuid(), jobId));
+ System.out.println(
+ String.format(
+ "Job \"%s\" finished with state %s:\n%s",
+ jobId, jobInfo.getStatus().getState(), new String(blob.getContent())));
+ } catch (TimeoutException e) {
+ System.err.println(
+ String.format("Job timed out after %d minutes: %s", timeout, e.getMessage()));
+ }
+
+ // Delete the cluster.
+ OperationFuture deleteClusterAsyncRequest =
+ clusterControllerClient.deleteClusterAsync(projectId, region, clusterName);
+ deleteClusterAsyncRequest.get();
+ System.out.println(String.format("Cluster \"%s\" successfully deleted.", clusterName));
+
+ } catch (ExecutionException e) {
+ System.err.println(String.format("Error executing quickstart: %s ", e.getMessage()));
+ }
+ }
+}
+// [END dataproc_quickstart]
diff --git a/dataproc/src/test/java/CreateClusterTest.java b/dataproc/src/test/java/CreateClusterTest.java
index 800856b02dc..539e0e2b624 100644
--- a/dataproc/src/test/java/CreateClusterTest.java
+++ b/dataproc/src/test/java/CreateClusterTest.java
@@ -38,18 +38,17 @@
@RunWith(JUnit4.class)
public class CreateClusterTest {
- private static final String BASE_CLUSTER_NAME = "test-cluster";
+ private static final String CLUSTER_NAME =
+ String.format("java-cc-test-%s", UUID.randomUUID().toString());
private static final String REGION = "us-central1";
+ private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
- private static String projectId = System.getenv("GOOGLE_CLOUD_PROJECT");
- private String clusterName;
private ByteArrayOutputStream bout;
private static void requireEnv(String varName) {
assertNotNull(
- System.getenv(varName),
- String.format("Environment variable '%s' is required to perform these tests.", varName)
- );
+ String.format("Environment variable '%s' is required to perform these tests.", varName),
+ System.getenv(varName));
}
@BeforeClass
@@ -60,35 +59,30 @@ public static void checkRequirements() {
@Before
public void setUp() {
- clusterName = String.format("%s-%s", BASE_CLUSTER_NAME, UUID.randomUUID().toString());
-
bout = new ByteArrayOutputStream();
System.setOut(new PrintStream(bout));
}
@Test
public void createClusterTest() throws IOException, InterruptedException {
- CreateCluster.createCluster(projectId, REGION, clusterName);
+ CreateCluster.createCluster(PROJECT_ID, REGION, CLUSTER_NAME);
String output = bout.toString();
- assertThat(output, CoreMatchers.containsString(clusterName));
+ assertThat(output, CoreMatchers.containsString(CLUSTER_NAME));
}
@After
- public void tearDown() throws IOException, InterruptedException {
+ public void tearDown() throws IOException, InterruptedException, ExecutionException {
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", REGION);
ClusterControllerSettings clusterControllerSettings =
ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
- try (ClusterControllerClient clusterControllerClient = ClusterControllerClient
- .create(clusterControllerSettings)) {
+ try (ClusterControllerClient clusterControllerClient =
+ ClusterControllerClient.create(clusterControllerSettings)) {
OperationFuture deleteClusterAsyncRequest =
- clusterControllerClient.deleteClusterAsync(projectId, REGION, clusterName);
+ clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME);
deleteClusterAsyncRequest.get();
-
- } catch (ExecutionException e) {
- System.out.println("Error during cluster deletion: \n" + e.toString());
}
}
-}
\ No newline at end of file
+}
diff --git a/dataproc/src/test/java/QuickstartTest.java b/dataproc/src/test/java/QuickstartTest.java
new file mode 100644
index 00000000000..3296e224828
--- /dev/null
+++ b/dataproc/src/test/java/QuickstartTest.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2019 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static junit.framework.TestCase.assertNotNull;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.cloud.dataproc.v1.Cluster;
+import com.google.cloud.dataproc.v1.ClusterControllerClient;
+import com.google.cloud.dataproc.v1.ClusterControllerSettings;
+import com.google.cloud.dataproc.v1.ClusterOperationMetadata;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Bucket;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import com.google.protobuf.Empty;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class QuickstartTest {
+
+ private static final String MY_UUID = UUID.randomUUID().toString();
+ private static final String REGION = "us-central1";
+ private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
+ private static final String ENDPOINT = String.format("%s-dataproc.googleapis.com:443", REGION);
+ private static final String CLUSTER_NAME = String.format("java-qs-test-%s", MY_UUID);
+ private static final String BUCKET_NAME = String.format("java-dataproc-qs-test-%s", MY_UUID);
+ private static final String JOB_FILE_NAME = "sum.py";
+ private static final String JOB_FILE_PATH =
+ String.format("gs://%s/%s", BUCKET_NAME, JOB_FILE_NAME);
+ private static final String SORT_CODE =
+ "import pyspark\n"
+ + "sc = pyspark.SparkContext()\n"
+ + "rdd = sc.parallelize((1,2,3,4,5))\n"
+ + "sum = rdd.reduce(lambda x, y: x + y)\n";
+
+ private ByteArrayOutputStream bout;
+ private Bucket bucket;
+ private Blob blob;
+
+ private static void requireEnv(String varName) {
+ assertNotNull(
+ String.format("Environment variable '%s' is required to perform these tests.", varName),
+ System.getenv(varName));
+ }
+
+ @BeforeClass
+ public static void checkRequirements() {
+ requireEnv("GOOGLE_APPLICATION_CREDENTIALS");
+ requireEnv("GOOGLE_CLOUD_PROJECT");
+ }
+
+ @Before
+ public void setUp() {
+ bout = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(bout));
+
+ Storage storage = StorageOptions.getDefaultInstance().getService();
+ bucket = storage.create(BucketInfo.of(BUCKET_NAME));
+ blob = bucket.create(JOB_FILE_NAME, SORT_CODE.getBytes(UTF_8), "text/plain");
+ }
+
+ @Test
+ public void quickstartTest() throws IOException, InterruptedException {
+ Quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH);
+ String output = bout.toString();
+
+ assertThat(output, CoreMatchers.containsString("Cluster created successfully"));
+ assertThat(output, CoreMatchers.containsString("Submitted job"));
+ assertThat(output, CoreMatchers.containsString("finished with state DONE:"));
+ assertThat(output, CoreMatchers.containsString("successfully deleted"));
+ }
+
+ @After
+ public void teardown() throws IOException, InterruptedException, ExecutionException {
+ blob.delete();
+ bucket.delete();
+
+ ClusterControllerSettings clusterControllerSettings =
+ ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build();
+
+ try (ClusterControllerClient clusterControllerClient =
+ ClusterControllerClient.create(clusterControllerSettings)) {
+ for (Cluster element :
+ clusterControllerClient.listClusters(PROJECT_ID, REGION).iterateAll()) {
+ if (element.getClusterName() == CLUSTER_NAME) {
+ OperationFuture deleteClusterAsyncRequest =
+ clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME);
+ deleteClusterAsyncRequest.get();
+ break;
+ }
+ }
+ }
+ }
+}