From 338d3921876536c36a94e6bf2191c87d1ba834c5 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 5 Apr 2018 14:36:27 -0700 Subject: [PATCH 1/2] bigquery: implement regionalization --- .../com/google/cloud/bigquery/BigQuery.java | 6 ++ .../google/cloud/bigquery/BigQueryImpl.java | 71 +++++++++++++------ .../java/com/google/cloud/bigquery/JobId.java | 4 +- .../cloud/bigquery/spi/v2/BigQueryRpc.java | 9 ++- .../bigquery/spi/v2/HttpBigQueryRpc.java | 27 ++++--- .../cloud/bigquery/BigQueryImplTest.java | 31 ++++---- .../cloud/bigquery/it/ITBigQueryTest.java | 48 ++++++++++++- 7 files changed, 147 insertions(+), 49 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java index 7f216910f8d9..2e738e4e888b 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java @@ -985,6 +985,9 @@ TableResult listTableData( */ Job getJob(String jobId, JobOption... options); + // TODO(pongad): document + Job getJob(String jobId, String location, JobOption... options); + /** * Returns the requested job or {@code null} if not found. * @@ -1039,6 +1042,9 @@ TableResult listTableData( */ boolean cancel(String jobId); + // TODO(pongad): document + boolean cancel(String jobId, String location); + /** * Sends a job cancel request. This call will return immediately. The job status can then be * checked using either {@link #getJob(JobId, JobOption...)} or diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java index 169b9bfd963a..91acb5ff4588 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java @@ -571,19 +571,31 @@ public Job getJob(String jobId, JobOption... options) { return getJob(JobId.of(jobId), options); } + @Override + public Job getJob(String jobId, String location, JobOption... options) { + return getJob(JobId.newBuilder().setJob(jobId).setLocation(location).build(), options); + } + @Override public Job getJob(JobId jobId, JobOption... options) { final Map optionsMap = optionMap(options); final JobId completeJobId = jobId.setProjectId(getOptions().getProjectId()); try { com.google.api.services.bigquery.model.Job answer = - runWithRetries(new Callable() { - @Override - public com.google.api.services.bigquery.model.Job call() { - return bigQueryRpc.getJob(completeJobId.getProject(), completeJobId.getJob(), - optionsMap); - } - }, getOptions().getRetrySettings(), EXCEPTION_HANDLER, getOptions().getClock()); + runWithRetries( + new Callable() { + @Override + public com.google.api.services.bigquery.model.Job call() { + return bigQueryRpc.getJob( + completeJobId.getProject(), + completeJobId.getJob(), + completeJobId.getLocation(), + optionsMap); + } + }, + getOptions().getRetrySettings(), + EXCEPTION_HANDLER, + getOptions().getClock()); return answer == null ? null : Job.fromPb(this, answer); } catch (RetryHelper.RetryHelperException e) { throw BigQueryException.translateAndThrow(e); @@ -622,16 +634,26 @@ public boolean cancel(String jobId) { return cancel(JobId.of(jobId)); } + @Override + public boolean cancel(String jobId, String location) { + return cancel(JobId.newBuilder().setJob(jobId).setLocation(location).build()); + } + @Override public boolean cancel(JobId jobId) { final JobId completeJobId = jobId.setProjectId(getOptions().getProjectId()); try { - return runWithRetries(new Callable() { - @Override - public Boolean call() { - return bigQueryRpc.cancel(completeJobId.getProject(), completeJobId.getJob()); - } - }, getOptions().getRetrySettings(), EXCEPTION_HANDLER, getOptions().getClock()); + return runWithRetries( + new Callable() { + @Override + public Boolean call() { + return bigQueryRpc.cancel( + completeJobId.getProject(), completeJobId.getJob(), completeJobId.getLocation()); + } + }, + getOptions().getRetrySettings(), + EXCEPTION_HANDLER, + getOptions().getClock()); } catch (RetryHelper.RetryHelperException e) { throw BigQueryException.translateAndThrow(e); } @@ -662,13 +684,22 @@ private static QueryResponse getQueryResults(JobId jobId, final JobId completeJobId = jobId.setProjectId(serviceOptions.getProjectId()); try { GetQueryResultsResponse results = - runWithRetries(new Callable() { - @Override - public GetQueryResultsResponse call() { - return serviceOptions.getBigQueryRpcV2().getQueryResults( - completeJobId.getProject(), completeJobId.getJob(), optionsMap); - } - }, serviceOptions.getRetrySettings(), EXCEPTION_HANDLER, serviceOptions.getClock()); + runWithRetries( + new Callable() { + @Override + public GetQueryResultsResponse call() { + return serviceOptions + .getBigQueryRpcV2() + .getQueryResults( + completeJobId.getProject(), + completeJobId.getJob(), + completeJobId.getLocation(), + optionsMap); + } + }, + serviceOptions.getRetrySettings(), + EXCEPTION_HANDLER, + serviceOptions.getClock()); TableSchema schemaPb = results.getSchema(); ImmutableList.Builder errors = ImmutableList.builder(); diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobId.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobId.java index 7d88dc8cc1ac..6d2336ede86b 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobId.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobId.java @@ -58,7 +58,7 @@ public abstract class JobId implements Serializable { * "EU". */ @Nullable - abstract String getLocation(); + public abstract String getLocation(); public abstract Builder toBuilder(); @@ -77,7 +77,7 @@ public Builder setRandomJob() { return setJob(UUID.randomUUID().toString()); } - abstract Builder setLocation(String location); + public abstract Builder setLocation(String location); public abstract JobId build(); } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java index dcb24f337f56..bd2ccc3a9f67 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.spi.v2; +import com.google.api.core.InternalExtensionOnly; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.GetQueryResultsResponse; import com.google.api.services.bigquery.model.Job; @@ -29,6 +30,7 @@ import com.google.cloud.bigquery.BigQueryException; import java.util.Map; +@InternalExtensionOnly public interface BigQueryRpc extends ServiceRpc { // These options are part of the Google Cloud BigQuery query parameters @@ -176,7 +178,7 @@ TableDataList listTableData( * * @throws BigQueryException upon failure */ - Job getJob(String projectId, String jobId, Map options); + Job getJob(String projectId, String jobId, String location, Map options); /** * Lists the project's jobs. @@ -193,14 +195,15 @@ TableDataList listTableData( * found * @throws BigQueryException upon failure */ - boolean cancel(String projectId, String jobId); + boolean cancel(String projectId, String jobId, String location); /** * Returns results of the query associated with the provided job. * * @throws BigQueryException upon failure */ - GetQueryResultsResponse getQueryResults(String projectId, String jobId, Map options); + GetQueryResultsResponse getQueryResults( + String projectId, String jobId, String location, Map options); /** * Opens a resumable upload session to load data into a BigQuery table and returns an upload URI. diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/HttpBigQueryRpc.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/HttpBigQueryRpc.java index aca063998ce4..b14762567470 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/HttpBigQueryRpc.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/HttpBigQueryRpc.java @@ -32,6 +32,7 @@ import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson.JacksonFactory; import com.google.api.core.InternalApi; +import com.google.api.core.InternalExtensionOnly; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.DatasetList; @@ -59,6 +60,7 @@ import java.util.List; import java.util.Map; +@InternalExtensionOnly public class HttpBigQueryRpc implements BigQueryRpc { public static final String DEFAULT_PROJECTION = "full"; @@ -308,10 +310,12 @@ public TableDataList listTableData( } @Override - public Job getJob(String projectId, String jobId, Map options) { + public Job getJob(String projectId, String jobId, String location, Map options) { try { - return bigquery.jobs() + return bigquery + .jobs() .get(projectId, jobId) + .setLocation(location) .setFields(Option.FIELDS.getString(options)) .execute(); } catch (IOException ex) { @@ -365,9 +369,9 @@ public Job apply(JobList.Jobs jobPb) { } @Override - public boolean cancel(String projectId, String jobId) { + public boolean cancel(String projectId, String jobId, String location) { try { - bigquery.jobs().cancel(projectId, jobId).execute(); + bigquery.jobs().cancel(projectId, jobId).setLocation(location).execute(); return true; } catch (IOException ex) { BigQueryException serviceException = translate(ex); @@ -379,14 +383,19 @@ public boolean cancel(String projectId, String jobId) { } @Override - public GetQueryResultsResponse getQueryResults(String projectId, String jobId, - Map options) { + public GetQueryResultsResponse getQueryResults( + String projectId, String jobId, String location, Map options) { try { - return bigquery.jobs().getQueryResults(projectId, jobId) + return bigquery + .jobs() + .getQueryResults(projectId, jobId) + .setLocation(location) .setMaxResults(Option.MAX_RESULTS.getLong(options)) .setPageToken(Option.PAGE_TOKEN.getString(options)) - .setStartIndex(Option.START_INDEX.getLong(options) != null - ? BigInteger.valueOf(Option.START_INDEX.getLong(options)) : null) + .setStartIndex( + Option.START_INDEX.getLong(options) != null + ? BigInteger.valueOf(Option.START_INDEX.getLong(options)) + : null) .setTimeoutMs(Option.TIMEOUT.getLong(options)) .execute(); } catch (IOException ex) { diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index af58e7e292f7..1893cf3bed19 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -963,7 +963,10 @@ public JobId get() { .andThrow(new BigQueryException(409, "already exists, for some reason")); EasyMock.expect( bigqueryRpcMock.getJob( - EasyMock.anyString(), EasyMock.eq(id), EasyMock.eq(EMPTY_RPC_OPTIONS))) + EasyMock.anyString(), + EasyMock.eq(id), + EasyMock.eq((String) null), + EasyMock.eq(EMPTY_RPC_OPTIONS))) .andReturn(newJobPb()); EasyMock.replay(bigqueryRpcMock); @@ -994,7 +997,7 @@ public void testCreateJobWithProjectId() { @Test public void testGetJob() { - EasyMock.expect(bigqueryRpcMock.getJob(PROJECT, JOB, EMPTY_RPC_OPTIONS)) + EasyMock.expect(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)) .andReturn(COMPLETE_COPY_JOB.toPb()); EasyMock.replay(bigqueryRpcMock); bigquery = options.getService(); @@ -1004,7 +1007,7 @@ public void testGetJob() { @Test public void testGetJobFromJobId() { - EasyMock.expect(bigqueryRpcMock.getJob(PROJECT, JOB, EMPTY_RPC_OPTIONS)) + EasyMock.expect(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)) .andReturn(COMPLETE_COPY_JOB.toPb()); EasyMock.replay(bigqueryRpcMock); bigquery = options.getService(); @@ -1016,7 +1019,7 @@ public void testGetJobFromJobId() { public void testGetJobFromJobIdWithProject() { JobId jobId = JobId.of(OTHER_PROJECT, JOB); JobInfo jobInfo = COPY_JOB.setProjectId(OTHER_PROJECT); - EasyMock.expect(bigqueryRpcMock.getJob(OTHER_PROJECT, JOB, EMPTY_RPC_OPTIONS)) + EasyMock.expect(bigqueryRpcMock.getJob(OTHER_PROJECT, JOB, null, EMPTY_RPC_OPTIONS)) .andReturn(jobInfo.toPb()); EasyMock.replay(bigqueryRpcMock); bigquery = options.getService(); @@ -1114,7 +1117,7 @@ public com.google.api.services.bigquery.model.Job apply(Job job) { @Test public void testCancelJob() { - EasyMock.expect(bigqueryRpcMock.cancel(PROJECT, JOB)).andReturn(true); + EasyMock.expect(bigqueryRpcMock.cancel(PROJECT, JOB, null)).andReturn(true); EasyMock.replay(bigqueryRpcMock); bigquery = options.getService(); assertTrue(bigquery.cancel(JOB)); @@ -1122,7 +1125,7 @@ public void testCancelJob() { @Test public void testCancelJobFromJobId() { - EasyMock.expect(bigqueryRpcMock.cancel(PROJECT, JOB)).andReturn(true); + EasyMock.expect(bigqueryRpcMock.cancel(PROJECT, JOB, null)).andReturn(true); EasyMock.replay(bigqueryRpcMock); bigquery = options.getService(); assertTrue(bigquery.cancel(JobId.of(PROJECT, JOB))); @@ -1131,7 +1134,7 @@ public void testCancelJobFromJobId() { @Test public void testCancelJobFromJobIdWithProject() { JobId jobId = JobId.of(OTHER_PROJECT, JOB); - EasyMock.expect(bigqueryRpcMock.cancel(OTHER_PROJECT, JOB)).andReturn(true); + EasyMock.expect(bigqueryRpcMock.cancel(OTHER_PROJECT, JOB, null)).andReturn(true); EasyMock.replay(bigqueryRpcMock); bigquery = options.getService(); assertTrue(bigquery.cancel(jobId)); @@ -1164,7 +1167,7 @@ public void testQueryRequestCompleted() throws InterruptedException { .andReturn(jobResponsePb); EasyMock.expect( bigqueryRpcMock.getQueryResults( - PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) + PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) .andReturn(responsePb); EasyMock.expect( bigqueryRpcMock.listTableData( @@ -1217,7 +1220,7 @@ public void testQueryRequestCompletedOptions() throws InterruptedException { EasyMock.expect( bigqueryRpcMock.getQueryResults( - PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) + PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) .andReturn(responsePb); EasyMock.expect(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, optionMap)) .andReturn( @@ -1271,11 +1274,11 @@ public void testQueryRequestCompletedOnSecondAttempt() throws InterruptedExcepti EasyMock.expect( bigqueryRpcMock.getQueryResults( - PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) + PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) .andReturn(responsePb1); EasyMock.expect( bigqueryRpcMock.getQueryResults( - PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) + PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) .andReturn(responsePb2); EasyMock.expect( bigqueryRpcMock.listTableData( @@ -1310,7 +1313,7 @@ public void testGetQueryResults() { .setPageToken(CURSOR) .setTotalBytesProcessed(42L) .setTotalRows(BigInteger.valueOf(1L)); - EasyMock.expect(bigqueryRpcMock.getQueryResults(PROJECT, JOB, EMPTY_RPC_OPTIONS)) + EasyMock.expect(bigqueryRpcMock.getQueryResults(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)) .andReturn(responsePb); EasyMock.replay(bigqueryRpcMock); bigquery = options.getService(); @@ -1332,7 +1335,7 @@ public void testGetQueryResultsWithProject() { .setPageToken(CURSOR) .setTotalBytesProcessed(42L) .setTotalRows(BigInteger.valueOf(1L)); - EasyMock.expect(bigqueryRpcMock.getQueryResults(OTHER_PROJECT, JOB, EMPTY_RPC_OPTIONS)) + EasyMock.expect(bigqueryRpcMock.getQueryResults(OTHER_PROJECT, JOB, null, EMPTY_RPC_OPTIONS)) .andReturn(responsePb); EasyMock.replay(bigqueryRpcMock); bigquery = options.getService(); @@ -1353,7 +1356,7 @@ public void testGetQueryResultsWithOptions() { .setPageToken(CURSOR) .setTotalBytesProcessed(42L) .setTotalRows(BigInteger.valueOf(1L)); - EasyMock.expect(bigqueryRpcMock.getQueryResults(PROJECT, JOB, QUERY_RESULTS_OPTIONS)) + EasyMock.expect(bigqueryRpcMock.getQueryResults(PROJECT, JOB, null, QUERY_RESULTS_OPTIONS)) .andReturn(responsePb); EasyMock.replay(bigqueryRpcMock); bigquery = options.getService(); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 0dd220fd1331..e5bbb0f24602 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -29,6 +29,7 @@ import com.google.api.gax.paging.Page; import com.google.cloud.RetryOption; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; import com.google.cloud.bigquery.BigQuery.DatasetField; import com.google.cloud.bigquery.BigQuery.DatasetOption; import com.google.cloud.bigquery.BigQuery.JobField; @@ -91,6 +92,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -1095,7 +1097,6 @@ public void testQueryJobWithDryRun() throws InterruptedException, TimeoutExcepti .setDryRun(true) .build(); Job remoteJob = bigquery.create(JobInfo.of(configuration)); - System.out.println("job (dryrun): " + remoteJob); assertNull(remoteJob.getJobId().getJob()); assertEquals(DONE, remoteJob.getStatus().getState()); assertNotNull(remoteJob.getConfiguration()); @@ -1216,4 +1217,49 @@ public void testInsertFromFile() throws InterruptedException, IOException, Timeo assertEquals(2, rowCount); assertTrue(bigquery.delete(DATASET, destinationTableName)); } + + @Test + public void testLocationRoundTrip() throws Exception { + String location = "EU"; + String wrongLocation = "US"; + + assertThat(location).isNotEqualTo(wrongLocation); + + Dataset dataset = + bigquery.create( + DatasetInfo.newBuilder("locationset_" + UUID.randomUUID().toString().replace("-", "_")) + .setLocation(location) + .build()); + try { + TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), "sometable"); + Schema schema = Schema.of(Field.of("name", LegacySQLTypeName.STRING)); + TableDefinition tableDef = StandardTableDefinition.of(schema); + Table table = bigquery.create(TableInfo.newBuilder(tableId, tableDef).build()); + + String query = + String.format( + "SELECT * FROM `%s.%s.%s`", + table.getTableId().getProject(), + table.getTableId().getDataset(), + table.getTableId().getTable()); + Job job = + bigquery.create( + JobInfo.of( + JobId.newBuilder().setLocation(location).build(), + QueryJobConfiguration.of(query))); + job = job.waitFor(); + assertThat(job.getStatus().getError()).isNull(); + + assertThat(job.getJobId().getLocation()).isEqualTo(location); + + // Roundtripped location in job id should work. + assertThat(bigquery.getJob(job.getJobId())).isNotNull(); + + // Wrong location shouldn't work. + assertThat(bigquery.getJob(job.getJobId().toBuilder().setLocation(wrongLocation).build())) + .isNull(); + } finally { + bigquery.delete(dataset.getDatasetId(), DatasetDeleteOption.deleteContents()); + } + } } From 75c00d4560c0db7491261ccb97ab2844db6f2039 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 9 Apr 2018 18:19:39 -0700 Subject: [PATCH 2/2] also make write work --- .../com/google/cloud/bigquery/BigQuery.java | 9 +- .../google/cloud/bigquery/BigQueryImpl.java | 20 ++- .../cloud/bigquery/TableDataWriteChannel.java | 32 +++-- .../cloud/bigquery/spi/v2/BigQueryRpc.java | 3 +- .../bigquery/spi/v2/HttpBigQueryRpc.java | 4 +- .../cloud/bigquery/BigQueryImplTest.java | 14 -- .../bigquery/TableDataWriteChannelTest.java | 135 +++++++++++++----- .../cloud/bigquery/it/ITBigQueryTest.java | 87 ++++++++--- 8 files changed, 205 insertions(+), 99 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java index 2e738e4e888b..28e5f8671411 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java @@ -985,9 +985,6 @@ TableResult listTableData( */ Job getJob(String jobId, JobOption... options); - // TODO(pongad): document - Job getJob(String jobId, String location, JobOption... options); - /** * Returns the requested job or {@code null} if not found. * @@ -1042,9 +1039,6 @@ TableResult listTableData( */ boolean cancel(String jobId); - // TODO(pongad): document - boolean cancel(String jobId, String location); - /** * Sends a job cancel request. This call will return immediately. The job status can then be * checked using either {@link #getJob(JobId, JobOption...)} or @@ -1182,4 +1176,7 @@ TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... * @throws BigQueryException upon failure */ TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfiguration); + + // TODO(pongad): document + TableDataWriteChannel writer(JobId jobId, WriteChannelConfiguration writeChannelConfiguration); } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java index 91acb5ff4588..25ea72af4563 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java @@ -571,11 +571,6 @@ public Job getJob(String jobId, JobOption... options) { return getJob(JobId.of(jobId), options); } - @Override - public Job getJob(String jobId, String location, JobOption... options) { - return getJob(JobId.newBuilder().setJob(jobId).setLocation(location).build(), options); - } - @Override public Job getJob(JobId jobId, JobOption... options) { final Map optionsMap = optionMap(options); @@ -634,11 +629,6 @@ public boolean cancel(String jobId) { return cancel(JobId.of(jobId)); } - @Override - public boolean cancel(String jobId, String location) { - return cancel(JobId.newBuilder().setJob(jobId).setLocation(location).build()); - } - @Override public boolean cancel(JobId jobId) { final JobId completeJobId = jobId.setProjectId(getOptions().getProjectId()); @@ -722,7 +712,15 @@ public GetQueryResultsResponse call() { @Override public TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfiguration) { - return new TableDataWriteChannel(getOptions(), + return writer(JobId.of(), writeChannelConfiguration); + } + + @Override + public TableDataWriteChannel writer( + JobId jobId, WriteChannelConfiguration writeChannelConfiguration) { + return new TableDataWriteChannel( + getOptions(), + jobId.setProjectId(getOptions().getProjectId()), writeChannelConfiguration.setProjectId(getOptions().getProjectId())); } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java index 47f9e8d4ad81..b89ee6c0e423 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java @@ -22,7 +22,6 @@ import com.google.cloud.RestorableState; import com.google.cloud.RetryHelper; import com.google.cloud.WriteChannel; - import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; @@ -37,9 +36,9 @@ public class TableDataWriteChannel extends private Job job; - TableDataWriteChannel(BigQueryOptions options, - WriteChannelConfiguration writeChannelConfiguration) { - this(options, writeChannelConfiguration, open(options, writeChannelConfiguration)); + TableDataWriteChannel( + BigQueryOptions options, JobId jobId, WriteChannelConfiguration writeChannelConfiguration) { + this(options, writeChannelConfiguration, open(options, jobId, writeChannelConfiguration)); } TableDataWriteChannel(BigQueryOptions options, WriteChannelConfiguration config, @@ -69,15 +68,26 @@ protected StateImpl.Builder stateBuilder() { return StateImpl.builder(getOptions(), getEntity(), getUploadId(), job); } - private static String open(final BigQueryOptions options, + private static String open( + final BigQueryOptions options, + final JobId jobId, final WriteChannelConfiguration writeChannelConfiguration) { try { - return runWithRetries(new Callable() { - @Override - public String call() { - return options.getBigQueryRpcV2().open(writeChannelConfiguration.toPb()); - } - }, options.getRetrySettings(), BigQueryImpl.EXCEPTION_HANDLER, options.getClock()); + return runWithRetries( + new Callable() { + @Override + public String call() { + return options + .getBigQueryRpcV2() + .open( + new com.google.api.services.bigquery.model.Job() + .setConfiguration(writeChannelConfiguration.toPb()) + .setJobReference(jobId.toPb())); + } + }, + options.getRetrySettings(), + BigQueryImpl.EXCEPTION_HANDLER, + options.getClock()); } catch (RetryHelper.RetryHelperException e) { throw BigQueryException.translateAndThrow(e); } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java index bd2ccc3a9f67..46031adae64e 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java @@ -20,7 +20,6 @@ import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.GetQueryResultsResponse; import com.google.api.services.bigquery.model.Job; -import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableDataInsertAllRequest; import com.google.api.services.bigquery.model.TableDataInsertAllResponse; @@ -211,7 +210,7 @@ GetQueryResultsResponse getQueryResults( * @param configuration load configuration * @throws BigQueryException upon failure */ - String open(JobConfiguration configuration); + String open(Job job); /** * Uploads the provided data to the resumable upload session at the specified position. This diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/HttpBigQueryRpc.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/HttpBigQueryRpc.java index b14762567470..2f56d5386164 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/HttpBigQueryRpc.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/HttpBigQueryRpc.java @@ -39,7 +39,6 @@ import com.google.api.services.bigquery.model.DatasetReference; import com.google.api.services.bigquery.model.GetQueryResultsResponse; import com.google.api.services.bigquery.model.Job; -import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobList; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; @@ -404,9 +403,8 @@ public GetQueryResultsResponse getQueryResults( } @Override - public String open(JobConfiguration configuration) { + public String open(Job loadJob) { try { - Job loadJob = new Job().setConfiguration(configuration); String builder = BASE_RESUMABLE_URI + options.getProjectId() + "/jobs"; GenericUrl url = new GenericUrl(builder); url.set("uploadType", "resumable"); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index 1893cf3bed19..58f0eb67e674 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -38,7 +38,6 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.ServiceOptions; import com.google.cloud.Tuple; -import com.google.cloud.WriteChannel; import com.google.cloud.bigquery.BigQuery.QueryResultsOption; import com.google.cloud.bigquery.InsertAllRequest.RowToInsert; import com.google.cloud.bigquery.spi.BigQueryRpcFactory; @@ -1371,19 +1370,6 @@ public void testGetQueryResultsWithOptions() { assertEquals(null, response.getSchema()); } - @Test - public void testWriter() { - WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration.of(TABLE_ID); - EasyMock.expect( - bigqueryRpcMock.open(WriteChannelConfiguration.of(TABLE_ID_WITH_PROJECT).toPb())) - .andReturn("upload-id"); - EasyMock.replay(bigqueryRpcMock); - bigquery = options.getService(); - WriteChannel channel = bigquery.writer(writeChannelConfiguration); - assertNotNull(channel); - assertTrue(channel.isOpen()); - } - @Test public void testRetryableException() { EasyMock.expect(bigqueryRpcMock.getDataset(PROJECT, DATASET, EMPTY_RPC_OPTIONS)) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java index b83f52034f00..341380471672 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java @@ -32,9 +32,13 @@ import com.google.cloud.RestorableState; import com.google.cloud.WriteChannel; -import com.google.cloud.bigquery.spi.v2.BigQueryRpc; import com.google.cloud.bigquery.spi.BigQueryRpcFactory; - +import com.google.cloud.bigquery.spi.v2.BigQueryRpc; +import java.io.IOException; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; import org.easymock.Capture; import org.easymock.CaptureType; import org.junit.After; @@ -43,12 +47,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -import java.io.IOException; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Random; - public class TableDataWriteChannelTest { private static final String UPLOAD_ID = "uploadid"; @@ -67,7 +65,7 @@ public class TableDataWriteChannelTest { private static final Random RANDOM = new Random(); private static final LoadJobConfiguration JOB_CONFIGURATION = LoadJobConfiguration.of(TABLE_ID, "URI"); - private static final JobInfo JOB_INFO = JobInfo.of(JOB_CONFIGURATION); + private static final JobInfo JOB_INFO = JobInfo.of(JobId.of(), JOB_CONFIGURATION); @Rule public ExpectedException thrown = ExpectedException.none(); @@ -108,9 +106,14 @@ public void tearDown() throws Exception { @Test public void testCreate() { - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andReturn(UPLOAD_ID); replay(bigqueryRpcMock); - writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + writer = new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); assertTrue(writer.isOpen()); assertNull(writer.getJob()); } @@ -118,39 +121,64 @@ public void testCreate() { @Test public void testCreateRetryableError() { BigQueryException exception = new BigQueryException(new SocketException("Socket closed")); - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andThrow(exception); - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andThrow(exception); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andReturn(UPLOAD_ID); replay(bigqueryRpcMock); - writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + writer = new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); assertTrue(writer.isOpen()); assertNull(writer.getJob()); } @Test public void testCreateNonRetryableError() { - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andThrow(new RuntimeException()); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andThrow(new RuntimeException()); replay(bigqueryRpcMock); thrown.expect(RuntimeException.class); - new TableDataWriteChannel(options, LOAD_CONFIGURATION); + new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); } @Test public void testWriteWithoutFlush() throws IOException { - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andReturn(UPLOAD_ID); replay(bigqueryRpcMock); - writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + writer = new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE))); assertNull(writer.getJob()); } @Test public void testWriteWithFlush() throws IOException { - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); expect(bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(CUSTOM_CHUNK_SIZE), eq(false))).andReturn(null); replay(bigqueryRpcMock); - writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + writer = new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); writer.setChunkSize(CUSTOM_CHUNK_SIZE); ByteBuffer buffer = randomBuffer(CUSTOM_CHUNK_SIZE); assertEquals(CUSTOM_CHUNK_SIZE, writer.write(buffer)); @@ -160,12 +188,17 @@ public void testWriteWithFlush() throws IOException { @Test public void testWritesAndFlush() throws IOException { - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); expect(bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(DEFAULT_CHUNK_SIZE), eq(false))).andReturn(null); replay(bigqueryRpcMock); - writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + writer = new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); ByteBuffer[] buffers = new ByteBuffer[DEFAULT_CHUNK_SIZE / MIN_CHUNK_SIZE]; for (int i = 0; i < buffers.length; i++) { buffers[i] = randomBuffer(MIN_CHUNK_SIZE); @@ -182,13 +215,18 @@ public void testWritesAndFlush() throws IOException { @Test public void testCloseWithoutFlush() throws IOException { - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); expect(bigqueryRpcMock.write( eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true))) .andReturn(job.toPb()); replay(bigqueryRpcMock); - writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + writer = new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); assertTrue(writer.isOpen()); writer.close(); assertArrayEquals(new byte[0], capturedBuffer.getValue()); @@ -198,14 +236,19 @@ public void testCloseWithoutFlush() throws IOException { @Test public void testCloseWithFlush() throws IOException { - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); expect(bigqueryRpcMock.write( eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(true))) .andReturn(job.toPb()); replay(bigqueryRpcMock); - writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + writer = new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); assertTrue(writer.isOpen()); writer.write(buffer); writer.close(); @@ -217,13 +260,18 @@ public void testCloseWithFlush() throws IOException { @Test public void testWriteClosed() throws IOException { - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); expect(bigqueryRpcMock.write( eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true))) .andReturn(job.toPb()); replay(bigqueryRpcMock); - writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + writer = new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); writer.close(); assertEquals(job, writer.getJob()); try { @@ -236,7 +284,12 @@ public void testWriteClosed() throws IOException { @Test public void testSaveAndRestore() throws IOException { - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(CaptureType.ALL); Capture capturedPosition = Capture.newInstance(CaptureType.ALL); expect(bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), @@ -244,7 +297,7 @@ public void testSaveAndRestore() throws IOException { replay(bigqueryRpcMock); ByteBuffer buffer1 = randomBuffer(DEFAULT_CHUNK_SIZE); ByteBuffer buffer2 = randomBuffer(DEFAULT_CHUNK_SIZE); - writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + writer = new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); assertEquals(DEFAULT_CHUNK_SIZE, writer.write(buffer1)); assertArrayEquals(buffer1.array(), capturedBuffer.getValues().get(0)); assertEquals(new Long(0L), capturedPosition.getValues().get(0)); @@ -258,13 +311,18 @@ public void testSaveAndRestore() throws IOException { @Test public void testSaveAndRestoreClosed() throws IOException { - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); expect(bigqueryRpcMock.write( eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true))) .andReturn(job.toPb()); replay(bigqueryRpcMock); - writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + writer = new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); writer.close(); assertEquals(job, writer.getJob()); RestorableState writerState = writer.capture(); @@ -282,12 +340,19 @@ public void testSaveAndRestoreClosed() throws IOException { @Test public void testStateEquals() { - expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID).times(2); + expect( + bigqueryRpcMock.open( + new com.google.api.services.bigquery.model.Job() + .setJobReference(JOB_INFO.getJobId().toPb()) + .setConfiguration(LOAD_CONFIGURATION.toPb()))) + .andReturn(UPLOAD_ID) + .times(2); replay(bigqueryRpcMock); - writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + writer = new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); // avoid closing when you don't want partial writes upon failure @SuppressWarnings("resource") - WriteChannel writer2 = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + WriteChannel writer2 = + new TableDataWriteChannel(options, JOB_INFO.getJobId(), LOAD_CONFIGURATION); RestorableState state = writer.capture(); RestorableState state2 = writer2.capture(); assertEquals(state, state2); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index e5bbb0f24602..29d4c19ed74e 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -1219,7 +1219,7 @@ public void testInsertFromFile() throws InterruptedException, IOException, Timeo } @Test - public void testLocationRoundTrip() throws Exception { + public void testLocation() throws Exception { String location = "EU"; String wrongLocation = "US"; @@ -1242,22 +1242,75 @@ public void testLocationRoundTrip() throws Exception { table.getTableId().getProject(), table.getTableId().getDataset(), table.getTableId().getTable()); - Job job = - bigquery.create( - JobInfo.of( - JobId.newBuilder().setLocation(location).build(), - QueryJobConfiguration.of(query))); - job = job.waitFor(); - assertThat(job.getStatus().getError()).isNull(); - - assertThat(job.getJobId().getLocation()).isEqualTo(location); - - // Roundtripped location in job id should work. - assertThat(bigquery.getJob(job.getJobId())).isNotNull(); - - // Wrong location shouldn't work. - assertThat(bigquery.getJob(job.getJobId().toBuilder().setLocation(wrongLocation).build())) - .isNull(); + + // Test create/get + { + Job job = + bigquery.create( + JobInfo.of( + JobId.newBuilder().setLocation(location).build(), + QueryJobConfiguration.of(query))); + job = job.waitFor(); + assertThat(job.getStatus().getError()).isNull(); + + assertThat(job.getJobId().getLocation()).isEqualTo(location); + + JobId jobId = job.getJobId(); + JobId wrongId = jobId.toBuilder().setLocation(wrongLocation).build(); + + // Getting with location should work. + assertThat(bigquery.getJob(jobId)).isNotNull(); + // Getting with wrong location shouldn't work. + assertThat(bigquery.getJob(wrongId)).isNull(); + + // Cancelling with location should work. (Cancelling already finished job is fine.) + assertThat(bigquery.cancel(jobId)).isTrue(); + // Cancelling with wrong location shouldn't work. + assertThat(bigquery.cancel(wrongId)).isFalse(); + } + + // Test query + { + assertThat( + bigquery + .query( + QueryJobConfiguration.of(query), + JobId.newBuilder().setLocation(location).build()) + .iterateAll()) + .isEmpty(); + + try { + bigquery + .query( + QueryJobConfiguration.of(query), + JobId.newBuilder().setLocation(wrongLocation).build()) + .iterateAll(); + fail("querying a table with wrong location shouldn't work"); + } catch (BigQueryException e) { + // Nothing to do + } + } + + // Test write + { + WriteChannelConfiguration writeChannelConfiguration = + WriteChannelConfiguration.newBuilder(tableId) + .setFormatOptions(FormatOptions.csv()) + .build(); + try (TableDataWriteChannel writer = + bigquery.writer( + JobId.newBuilder().setLocation(location).build(), writeChannelConfiguration)) { + writer.write(ByteBuffer.wrap("foo".getBytes())); + } + + try { + bigquery.writer( + JobId.newBuilder().setLocation(wrongLocation).build(), writeChannelConfiguration); + fail("writing to a table with wrong location shouldn't work"); + } catch (BigQueryException e) { + // Nothing to do + } + } } finally { bigquery.delete(dataset.getDatasetId(), DatasetDeleteOption.deleteContents()); }