Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1176,4 +1176,7 @@ TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption...
* @throws BigQueryException upon failure
*/
TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfiguration);

// TODO(pongad): document
TableDataWriteChannel writer(JobId jobId, WriteChannelConfiguration writeChannelConfiguration);

This comment was marked as spam.

This comment was marked as spam.

}
Original file line number Diff line number Diff line change
Expand Up @@ -577,13 +577,20 @@ public Job getJob(JobId jobId, JobOption... options) {
final JobId completeJobId = jobId.setProjectId(getOptions().getProjectId());
try {
com.google.api.services.bigquery.model.Job answer =
runWithRetries(new Callable<com.google.api.services.bigquery.model.Job>() {
@Override
public com.google.api.services.bigquery.model.Job call() {
return bigQueryRpc.getJob(completeJobId.getProject(), completeJobId.getJob(),
optionsMap);
}
}, getOptions().getRetrySettings(), EXCEPTION_HANDLER, getOptions().getClock());
runWithRetries(
new Callable<com.google.api.services.bigquery.model.Job>() {
@Override
public com.google.api.services.bigquery.model.Job call() {
return bigQueryRpc.getJob(
completeJobId.getProject(),
completeJobId.getJob(),
completeJobId.getLocation(),
optionsMap);
}
},
getOptions().getRetrySettings(),
EXCEPTION_HANDLER,
getOptions().getClock());
return answer == null ? null : Job.fromPb(this, answer);
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
Expand Down Expand Up @@ -626,12 +633,17 @@ public boolean cancel(String jobId) {
public boolean cancel(JobId jobId) {
final JobId completeJobId = jobId.setProjectId(getOptions().getProjectId());
try {
return runWithRetries(new Callable<Boolean>() {
@Override
public Boolean call() {
return bigQueryRpc.cancel(completeJobId.getProject(), completeJobId.getJob());
}
}, getOptions().getRetrySettings(), EXCEPTION_HANDLER, getOptions().getClock());
return runWithRetries(
new Callable<Boolean>() {
@Override
public Boolean call() {
return bigQueryRpc.cancel(
completeJobId.getProject(), completeJobId.getJob(), completeJobId.getLocation());
}
},
getOptions().getRetrySettings(),
EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down Expand Up @@ -662,13 +674,22 @@ private static QueryResponse getQueryResults(JobId jobId,
final JobId completeJobId = jobId.setProjectId(serviceOptions.getProjectId());
try {
GetQueryResultsResponse results =
runWithRetries(new Callable<GetQueryResultsResponse>() {
@Override
public GetQueryResultsResponse call() {
return serviceOptions.getBigQueryRpcV2().getQueryResults(
completeJobId.getProject(), completeJobId.getJob(), optionsMap);
}
}, serviceOptions.getRetrySettings(), EXCEPTION_HANDLER, serviceOptions.getClock());
runWithRetries(
new Callable<GetQueryResultsResponse>() {
@Override
public GetQueryResultsResponse call() {
return serviceOptions
.getBigQueryRpcV2()
.getQueryResults(
completeJobId.getProject(),
completeJobId.getJob(),
completeJobId.getLocation(),
optionsMap);
}
},
serviceOptions.getRetrySettings(),
EXCEPTION_HANDLER,
serviceOptions.getClock());
TableSchema schemaPb = results.getSchema();

ImmutableList.Builder<BigQueryError> errors = ImmutableList.builder();
Expand All @@ -691,7 +712,15 @@ public GetQueryResultsResponse call() {

@Override
public TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfiguration) {
return new TableDataWriteChannel(getOptions(),
return writer(JobId.of(), writeChannelConfiguration);
}

@Override
public TableDataWriteChannel writer(
JobId jobId, WriteChannelConfiguration writeChannelConfiguration) {
return new TableDataWriteChannel(
getOptions(),
jobId.setProjectId(getOptions().getProjectId()),
writeChannelConfiguration.setProjectId(getOptions().getProjectId()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class JobId implements Serializable {
* "EU".
*/
@Nullable
abstract String getLocation();
public abstract String getLocation();

public abstract Builder toBuilder();

Expand All @@ -77,7 +77,7 @@ public Builder setRandomJob() {
return setJob(UUID.randomUUID().toString());
}

abstract Builder setLocation(String location);
public abstract Builder setLocation(String location);

public abstract JobId build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.cloud.RestorableState;
import com.google.cloud.RetryHelper;
import com.google.cloud.WriteChannel;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
Expand All @@ -37,9 +36,9 @@ public class TableDataWriteChannel extends

private Job job;

TableDataWriteChannel(BigQueryOptions options,
WriteChannelConfiguration writeChannelConfiguration) {
this(options, writeChannelConfiguration, open(options, writeChannelConfiguration));
TableDataWriteChannel(
BigQueryOptions options, JobId jobId, WriteChannelConfiguration writeChannelConfiguration) {
this(options, writeChannelConfiguration, open(options, jobId, writeChannelConfiguration));
}

TableDataWriteChannel(BigQueryOptions options, WriteChannelConfiguration config,
Expand Down Expand Up @@ -69,15 +68,26 @@ protected StateImpl.Builder stateBuilder() {
return StateImpl.builder(getOptions(), getEntity(), getUploadId(), job);
}

private static String open(final BigQueryOptions options,
private static String open(
final BigQueryOptions options,
final JobId jobId,
final WriteChannelConfiguration writeChannelConfiguration) {
try {
return runWithRetries(new Callable<String>() {
@Override
public String call() {
return options.getBigQueryRpcV2().open(writeChannelConfiguration.toPb());
}
}, options.getRetrySettings(), BigQueryImpl.EXCEPTION_HANDLER, options.getClock());
return runWithRetries(
new Callable<String>() {
@Override
public String call() {
return options
.getBigQueryRpcV2()
.open(
new com.google.api.services.bigquery.model.Job()
.setConfiguration(writeChannelConfiguration.toPb())
.setJobReference(jobId.toPb()));
}
},
options.getRetrySettings(),
BigQueryImpl.EXCEPTION_HANDLER,
options.getClock());
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package com.google.cloud.bigquery.spi.v2;

import com.google.api.core.InternalExtensionOnly;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
Expand All @@ -29,6 +29,7 @@
import com.google.cloud.bigquery.BigQueryException;
import java.util.Map;

@InternalExtensionOnly

This comment was marked as spam.

This comment was marked as spam.

public interface BigQueryRpc extends ServiceRpc {

// These options are part of the Google Cloud BigQuery query parameters
Expand Down Expand Up @@ -176,7 +177,7 @@ TableDataList listTableData(
*
* @throws BigQueryException upon failure
*/
Job getJob(String projectId, String jobId, Map<Option, ?> options);
Job getJob(String projectId, String jobId, String location, Map<Option, ?> options);

/**
* Lists the project's jobs.
Expand All @@ -193,22 +194,23 @@ TableDataList listTableData(
* found
* @throws BigQueryException upon failure
*/
boolean cancel(String projectId, String jobId);
boolean cancel(String projectId, String jobId, String location);

/**
* Returns results of the query associated with the provided job.
*
* @throws BigQueryException upon failure
*/
GetQueryResultsResponse getQueryResults(String projectId, String jobId, Map<Option, ?> options);
GetQueryResultsResponse getQueryResults(
String projectId, String jobId, String location, Map<Option, ?> options);

/**
* Opens a resumable upload session to load data into a BigQuery table and returns an upload URI.
*
* @param configuration load configuration
* @throws BigQueryException upon failure
*/
String open(JobConfiguration configuration);
String open(Job job);

/**
* Uploads the provided data to the resumable upload session at the specified position. This
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson.JacksonFactory;
import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.DatasetList;
import com.google.api.services.bigquery.model.DatasetReference;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobList;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
Expand All @@ -59,6 +59,7 @@
import java.util.List;
import java.util.Map;

@InternalExtensionOnly
public class HttpBigQueryRpc implements BigQueryRpc {

public static final String DEFAULT_PROJECTION = "full";
Expand Down Expand Up @@ -308,10 +309,12 @@ public TableDataList listTableData(
}

@Override
public Job getJob(String projectId, String jobId, Map<Option, ?> options) {
public Job getJob(String projectId, String jobId, String location, Map<Option, ?> options) {
try {
return bigquery.jobs()
return bigquery
.jobs()
.get(projectId, jobId)
.setLocation(location)
.setFields(Option.FIELDS.getString(options))
.execute();
} catch (IOException ex) {
Expand Down Expand Up @@ -365,9 +368,9 @@ public Job apply(JobList.Jobs jobPb) {
}

@Override
public boolean cancel(String projectId, String jobId) {
public boolean cancel(String projectId, String jobId, String location) {
try {
bigquery.jobs().cancel(projectId, jobId).execute();
bigquery.jobs().cancel(projectId, jobId).setLocation(location).execute();
return true;
} catch (IOException ex) {
BigQueryException serviceException = translate(ex);
Expand All @@ -379,14 +382,19 @@ public boolean cancel(String projectId, String jobId) {
}

@Override
public GetQueryResultsResponse getQueryResults(String projectId, String jobId,
Map<Option, ?> options) {
public GetQueryResultsResponse getQueryResults(
String projectId, String jobId, String location, Map<Option, ?> options) {
try {
return bigquery.jobs().getQueryResults(projectId, jobId)
return bigquery
.jobs()
.getQueryResults(projectId, jobId)
.setLocation(location)
.setMaxResults(Option.MAX_RESULTS.getLong(options))
.setPageToken(Option.PAGE_TOKEN.getString(options))
.setStartIndex(Option.START_INDEX.getLong(options) != null
? BigInteger.valueOf(Option.START_INDEX.getLong(options)) : null)
.setStartIndex(
Option.START_INDEX.getLong(options) != null
? BigInteger.valueOf(Option.START_INDEX.getLong(options))
: null)
.setTimeoutMs(Option.TIMEOUT.getLong(options))
.execute();
} catch (IOException ex) {
Expand All @@ -395,9 +403,8 @@ public GetQueryResultsResponse getQueryResults(String projectId, String jobId,
}

@Override
public String open(JobConfiguration configuration) {
public String open(Job loadJob) {
try {
Job loadJob = new Job().setConfiguration(configuration);
String builder = BASE_RESUMABLE_URI + options.getProjectId() + "/jobs";
GenericUrl url = new GenericUrl(builder);
url.set("uploadType", "resumable");
Expand Down
Loading