diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java index be5174b26..3a9937a07 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java @@ -192,7 +192,7 @@ public BigQueryResult executeSelect(String sql) throws BigQuerySQLException { if (isFastQuerySupported()) { String projectId = bigQueryOptions.getProjectId(); QueryRequest queryRequest = createQueryRequest(connectionSettings, sql, null, null); - return queryRpc(projectId, queryRequest, false); + return queryRpc(projectId, queryRequest, sql, false); } // use jobs.insert otherwise com.google.api.services.bigquery.model.Job queryJob = @@ -236,7 +236,7 @@ public BigQueryResult executeSelect( final String projectId = bigQueryOptions.getProjectId(); final QueryRequest queryRequest = createQueryRequest(connectionSettings, sql, parameters, labelMap); - return queryRpc(projectId, queryRequest, parameters != null); + return queryRpc(projectId, queryRequest, sql, parameters != null); } // use jobs.insert otherwise com.google.api.services.bigquery.model.Job queryJob = @@ -289,7 +289,10 @@ public int size() { } private BigQueryResult queryRpc( - final String projectId, final QueryRequest queryRequest, Boolean hasQueryParameters) { + final String projectId, + final QueryRequest queryRequest, + String sql, + Boolean hasQueryParameters) { com.google.api.services.bigquery.model.QueryResponse results; try { results = @@ -322,8 +325,29 @@ private BigQueryResult queryRpc( // and can be optimized here, but this is left as future work. Long totalRows = results.getTotalRows() == null ? null : results.getTotalRows().longValue(); Long pageRows = results.getRows() == null ? null : (long) (results.getRows().size()); + logger.log( + Level.WARNING, + "\n" + + String.format( + "results.getJobComplete(): %s, isSchemaNull: %s , totalRows: %s, pageRows: %s", + results.getJobComplete(), results.getSchema() == null, totalRows, pageRows)); JobId jobId = JobId.fromPb(results.getJobReference()); GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId); + // We might get null schema from the backend occasionally. Ref: + // https://github.com/googleapis/java-bigquery/issues/2103/. Using queryDryRun in such cases + // to get the schema + if (firstPage.getSchema() == null) { // get schema using dry run + // Log the status if the job was complete complete + logger.log( + Level.WARNING, + "\n" + + "Received null schema, Using dryRun the get the Schema. jobComplete:" + + firstPage.getJobComplete()); + com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql); + Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema()); + return getSubsequentQueryResultsWithJob( + totalRows, pageRows, jobId, firstPage, schema, hasQueryParameters); + } return getSubsequentQueryResultsWithJob( totalRows, pageRows, jobId, firstPage, hasQueryParameters); } @@ -1243,7 +1267,8 @@ com.google.api.services.bigquery.model.Job createQueryJob( } // Used by dryRun - private com.google.api.services.bigquery.model.Job createDryRunJob(String sql) { + @VisibleForTesting + com.google.api.services.bigquery.model.Job createDryRunJob(String sql) { com.google.api.services.bigquery.model.JobConfiguration configurationPb = new com.google.api.services.bigquery.model.JobConfiguration(); configurationPb.setDryRun(true); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java index e4fdc9731..9543ccebf 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java @@ -93,6 +93,26 @@ public class ConnectionImplTest { .setTotalRows(BigInteger.valueOf(1L)) .setSchema(FAST_QUERY_TABLESCHEMA); + private static final GetQueryResultsResponse GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA = + new GetQueryResultsResponse() + .setJobReference(QUERY_JOB.toPb()) + .setRows(ImmutableList.of(TABLE_ROW)) + .setJobComplete(false) + .setPageToken(PAGE_TOKEN) + .setTotalBytesProcessed(42L) + .setTotalRows(BigInteger.valueOf(1L)) + .setSchema(null); + + private static List TABLE_ROWS = + ImmutableList.of( + new TableRow() + .setF( + ImmutableList.of(new TableCell().setV("Value1"), new TableCell().setV("Value2"))), + new TableRow() + .setF( + ImmutableList.of( + new TableCell().setV("Value3"), new TableCell().setV("Value4")))); + private BigQueryOptions createBigQueryOptionsForProject( String project, BigQueryRpcFactory rpcFactory) { return BigQueryOptions.newBuilder() @@ -211,24 +231,13 @@ public void testQueryDryRun() throws BigQuerySQLException { @Test public void testParseDataTask() throws InterruptedException { - List tableRows = - ImmutableList.of( - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value1"), new TableCell().setV("Value2"))), - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value3"), new TableCell().setV("Value4")))); - BlockingQueue, Boolean>> pageCache = new LinkedBlockingDeque<>(2); BlockingQueue> rpcResponseQueue = new LinkedBlockingDeque<>(2); rpcResponseQueue.offer(Tuple.of(null, false)); // This call should populate page cache ConnectionImpl connectionSpy = Mockito.spy(connection); - connectionSpy.parseRpcDataAsync(tableRows, QUERY_SCHEMA, pageCache, rpcResponseQueue); + connectionSpy.parseRpcDataAsync(TABLE_ROWS, QUERY_SCHEMA, pageCache, rpcResponseQueue); Tuple, Boolean> fvlTupple = pageCache.take(); // wait for the parser thread to parse the data assertNotNull(fvlTupple); @@ -247,16 +256,6 @@ public void testParseDataTask() throws InterruptedException { @Test public void testPopulateBuffer() throws InterruptedException { - List tableRows = - ImmutableList.of( - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value1"), new TableCell().setV("Value2"))), - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value3"), new TableCell().setV("Value4")))); BlockingQueue, Boolean>> pageCache = new LinkedBlockingDeque<>(2); @@ -266,7 +265,7 @@ public void testPopulateBuffer() throws InterruptedException { // This call should populate page cache ConnectionImpl connectionSpy = Mockito.spy(connection); - connectionSpy.parseRpcDataAsync(tableRows, QUERY_SCHEMA, pageCache, rpcResponseQueue); + connectionSpy.parseRpcDataAsync(TABLE_ROWS, QUERY_SCHEMA, pageCache, rpcResponseQueue); verify(connectionSpy, times(1)) .parseRpcDataAsync( @@ -358,19 +357,62 @@ public void testLegacyQuerySinglePage() throws BigQuerySQLException { .createJobForQuery(any(com.google.api.services.bigquery.model.Job.class)); } + // calls executeSelect with a Fast query and emulates that no schema is returned with the first + // page + @Test + public void testFastQueryNullSchema() throws BigQuerySQLException { + ConnectionImpl connectionSpy = Mockito.spy(connection); + QueryRequest queryReqMock = new QueryRequest(); + com.google.api.services.bigquery.model.JobStatistics stats = + new com.google.api.services.bigquery.model.JobStatistics() + .setQuery(new JobStatistics2().setSchema(FAST_QUERY_TABLESCHEMA)); + com.google.api.services.bigquery.model.Job jobResponseMock = + new com.google.api.services.bigquery.model.Job() + // .setConfiguration(QUERY_JOB.g) + .setJobReference(QUERY_JOB.toPb()) + .setId(JOB) + .setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE")) + .setStatistics(stats); + // emulating a legacy query + doReturn(true).when(connectionSpy).isFastQuerySupported(); + com.google.api.services.bigquery.model.QueryResponse mockQueryRes = + new QueryResponse() + .setSchema(FAST_QUERY_TABLESCHEMA) + .setJobComplete(false) // so that it goes to the else part in queryRpc + .setTotalRows(new BigInteger(String.valueOf(4L))) + .setJobReference(QUERY_JOB.toPb()) + .setRows(TABLE_ROWS); + when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class))) + .thenReturn(mockQueryRes); + doReturn(GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA) // wiring the null schema for the test case + .when(connectionSpy) + .getQueryResultsFirstPage(any(JobId.class)); + doReturn(BQ_RS_MOCK_RES) + .when(connectionSpy) + .getSubsequentQueryResultsWithJob( + any(Long.class), + any(Long.class), + any(JobId.class), + any(GetQueryResultsResponse.class), + any(Schema.class), + any(Boolean.class)); + doReturn(jobResponseMock).when(connectionSpy).createDryRunJob(any(String.class)); + BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY); + assertEquals(res.getTotalRows(), 2); + assertEquals(QUERY_SCHEMA, res.getSchema()); + verify(connectionSpy, times(1)) + .getSubsequentQueryResultsWithJob( + any(Long.class), + any(Long.class), + any(JobId.class), + any(GetQueryResultsResponse.class), + any(Schema.class), + any(Boolean.class)); + } + // exercises getSubsequentQueryResultsWithJob for fast running queries @Test public void testFastQueryLongRunning() throws SQLException { - List tableRows = - ImmutableList.of( - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value1"), new TableCell().setV("Value2"))), - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value3"), new TableCell().setV("Value4")))); ConnectionImpl connectionSpy = Mockito.spy(connection); // emulating a fast query doReturn(true).when(connectionSpy).isFastQuerySupported(); @@ -389,7 +431,7 @@ public void testFastQueryLongRunning() throws SQLException { .setJobComplete(false) .setTotalRows(new BigInteger(String.valueOf(4L))) .setJobReference(QUERY_JOB.toPb()) - .setRows(tableRows); + .setRows(TABLE_ROWS); when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class))) .thenReturn(mockQueryRes); BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY);