Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public BigQueryResult executeSelect(String sql) throws BigQuerySQLException {
if (isFastQuerySupported()) {
String projectId = bigQueryOptions.getProjectId();
QueryRequest queryRequest = createQueryRequest(connectionSettings, sql, null, null);
return queryRpc(projectId, queryRequest, false);
return queryRpc(projectId, queryRequest, sql, false);
}
// use jobs.insert otherwise
com.google.api.services.bigquery.model.Job queryJob =
Expand Down Expand Up @@ -236,7 +236,7 @@ public BigQueryResult executeSelect(
final String projectId = bigQueryOptions.getProjectId();
final QueryRequest queryRequest =
createQueryRequest(connectionSettings, sql, parameters, labelMap);
return queryRpc(projectId, queryRequest, parameters != null);
return queryRpc(projectId, queryRequest, sql, parameters != null);
}
// use jobs.insert otherwise
com.google.api.services.bigquery.model.Job queryJob =
Expand Down Expand Up @@ -289,7 +289,10 @@ public int size() {
}

private BigQueryResult queryRpc(
final String projectId, final QueryRequest queryRequest, Boolean hasQueryParameters) {
final String projectId,
final QueryRequest queryRequest,
String sql,
Boolean hasQueryParameters) {
com.google.api.services.bigquery.model.QueryResponse results;
try {
results =
Expand Down Expand Up @@ -322,8 +325,29 @@ private BigQueryResult queryRpc(
// and can be optimized here, but this is left as future work.
Long totalRows = results.getTotalRows() == null ? null : results.getTotalRows().longValue();
Long pageRows = results.getRows() == null ? null : (long) (results.getRows().size());
logger.log(
Level.WARNING,
"\n"
+ String.format(
"results.getJobComplete(): %s, isSchemaNull: %s , totalRows: %s, pageRows: %s",
results.getJobComplete(), results.getSchema() == null, totalRows, pageRows));
JobId jobId = JobId.fromPb(results.getJobReference());
GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
// We might get null schema from the backend occasionally. Ref:
// https://github.com/googleapis/java-bigquery/issues/2103/. Using queryDryRun in such cases
// to get the schema
if (firstPage.getSchema() == null) { // get schema using dry run
// Log the status if the job was complete complete
logger.log(
Level.WARNING,
"\n"
+ "Received null schema, Using dryRun the get the Schema. jobComplete:"
+ firstPage.getJobComplete());
com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql);
Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema());
return getSubsequentQueryResultsWithJob(
totalRows, pageRows, jobId, firstPage, schema, hasQueryParameters);
}
return getSubsequentQueryResultsWithJob(
totalRows, pageRows, jobId, firstPage, hasQueryParameters);
}
Expand Down Expand Up @@ -1243,7 +1267,8 @@ com.google.api.services.bigquery.model.Job createQueryJob(
}

// Used by dryRun
private com.google.api.services.bigquery.model.Job createDryRunJob(String sql) {
@VisibleForTesting
com.google.api.services.bigquery.model.Job createDryRunJob(String sql) {
com.google.api.services.bigquery.model.JobConfiguration configurationPb =
new com.google.api.services.bigquery.model.JobConfiguration();
configurationPb.setDryRun(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ public class ConnectionImplTest {
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(FAST_QUERY_TABLESCHEMA);

private static final GetQueryResultsResponse GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA =
new GetQueryResultsResponse()
.setJobReference(QUERY_JOB.toPb())
.setRows(ImmutableList.of(TABLE_ROW))
.setJobComplete(false)
.setPageToken(PAGE_TOKEN)
.setTotalBytesProcessed(42L)
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(null);

private static List<TableRow> TABLE_ROWS =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));

private BigQueryOptions createBigQueryOptionsForProject(
String project, BigQueryRpcFactory rpcFactory) {
return BigQueryOptions.newBuilder()
Expand Down Expand Up @@ -211,24 +231,13 @@ public void testQueryDryRun() throws BigQuerySQLException {

@Test
public void testParseDataTask() throws InterruptedException {
List<TableRow> tableRows =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));

BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
new LinkedBlockingDeque<>(2);
BlockingQueue<Tuple<TableDataList, Boolean>> rpcResponseQueue = new LinkedBlockingDeque<>(2);
rpcResponseQueue.offer(Tuple.of(null, false));
// This call should populate page cache
ConnectionImpl connectionSpy = Mockito.spy(connection);
connectionSpy.parseRpcDataAsync(tableRows, QUERY_SCHEMA, pageCache, rpcResponseQueue);
connectionSpy.parseRpcDataAsync(TABLE_ROWS, QUERY_SCHEMA, pageCache, rpcResponseQueue);
Tuple<Iterable<FieldValueList>, Boolean> fvlTupple =
pageCache.take(); // wait for the parser thread to parse the data
assertNotNull(fvlTupple);
Expand All @@ -247,16 +256,6 @@ public void testParseDataTask() throws InterruptedException {

@Test
public void testPopulateBuffer() throws InterruptedException {
List<TableRow> tableRows =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));

BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
new LinkedBlockingDeque<>(2);
Expand All @@ -266,7 +265,7 @@ public void testPopulateBuffer() throws InterruptedException {
// This call should populate page cache
ConnectionImpl connectionSpy = Mockito.spy(connection);

connectionSpy.parseRpcDataAsync(tableRows, QUERY_SCHEMA, pageCache, rpcResponseQueue);
connectionSpy.parseRpcDataAsync(TABLE_ROWS, QUERY_SCHEMA, pageCache, rpcResponseQueue);

verify(connectionSpy, times(1))
.parseRpcDataAsync(
Expand Down Expand Up @@ -358,19 +357,62 @@ public void testLegacyQuerySinglePage() throws BigQuerySQLException {
.createJobForQuery(any(com.google.api.services.bigquery.model.Job.class));
}

// calls executeSelect with a Fast query and emulates that no schema is returned with the first
// page
@Test
public void testFastQueryNullSchema() throws BigQuerySQLException {
ConnectionImpl connectionSpy = Mockito.spy(connection);
QueryRequest queryReqMock = new QueryRequest();
com.google.api.services.bigquery.model.JobStatistics stats =
new com.google.api.services.bigquery.model.JobStatistics()
.setQuery(new JobStatistics2().setSchema(FAST_QUERY_TABLESCHEMA));
com.google.api.services.bigquery.model.Job jobResponseMock =
new com.google.api.services.bigquery.model.Job()
// .setConfiguration(QUERY_JOB.g)
.setJobReference(QUERY_JOB.toPb())
.setId(JOB)
.setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE"))
.setStatistics(stats);
// emulating a legacy query
doReturn(true).when(connectionSpy).isFastQuerySupported();
com.google.api.services.bigquery.model.QueryResponse mockQueryRes =
new QueryResponse()
.setSchema(FAST_QUERY_TABLESCHEMA)
.setJobComplete(false) // so that it goes to the else part in queryRpc
.setTotalRows(new BigInteger(String.valueOf(4L)))
.setJobReference(QUERY_JOB.toPb())
.setRows(TABLE_ROWS);
when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class)))
.thenReturn(mockQueryRes);
doReturn(GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA) // wiring the null schema for the test case
.when(connectionSpy)
.getQueryResultsFirstPage(any(JobId.class));
doReturn(BQ_RS_MOCK_RES)
.when(connectionSpy)
.getSubsequentQueryResultsWithJob(
any(Long.class),
any(Long.class),
any(JobId.class),
any(GetQueryResultsResponse.class),
any(Schema.class),
any(Boolean.class));
doReturn(jobResponseMock).when(connectionSpy).createDryRunJob(any(String.class));
BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY);
assertEquals(res.getTotalRows(), 2);
assertEquals(QUERY_SCHEMA, res.getSchema());
verify(connectionSpy, times(1))
.getSubsequentQueryResultsWithJob(
any(Long.class),
any(Long.class),
any(JobId.class),
any(GetQueryResultsResponse.class),
any(Schema.class),
any(Boolean.class));
}

// exercises getSubsequentQueryResultsWithJob for fast running queries
@Test
public void testFastQueryLongRunning() throws SQLException {
List<TableRow> tableRows =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));
ConnectionImpl connectionSpy = Mockito.spy(connection);
// emulating a fast query
doReturn(true).when(connectionSpy).isFastQuerySupported();
Expand All @@ -389,7 +431,7 @@ public void testFastQueryLongRunning() throws SQLException {
.setJobComplete(false)
.setTotalRows(new BigInteger(String.valueOf(4L)))
.setJobReference(QUERY_JOB.toPb())
.setRows(tableRows);
.setRows(TABLE_ROWS);
when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class)))
.thenReturn(mockQueryRes);
BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY);
Expand Down