Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class RemoteBigQueryHelper {
private static final String MODEL_NAME_PREFIX = "model_";
private static final String ROUTINE_NAME_PREFIX = "routine_";
private final BigQueryOptions options;
private static final int connectTimeout = 60000;
private static final int CONNECT_TIMEOUT_IN_MS = 60000;

private RemoteBigQueryHelper(BigQueryOptions options) {
this.options = options;
Expand Down Expand Up @@ -96,8 +96,8 @@ public static RemoteBigQueryHelper create(String projectId, InputStream keyStrea
HttpTransportOptions transportOptions = BigQueryOptions.getDefaultHttpTransportOptions();
transportOptions =
transportOptions.toBuilder()
.setConnectTimeout(connectTimeout)
.setReadTimeout(connectTimeout)
.setConnectTimeout(CONNECT_TIMEOUT_IN_MS)
.setReadTimeout(CONNECT_TIMEOUT_IN_MS)
.build();
BigQueryOptions bigqueryOptions =
BigQueryOptions.newBuilder()
Expand Down Expand Up @@ -133,8 +133,8 @@ public static RemoteBigQueryHelper create(BigQueryOptions.Builder bigqueryOption
HttpTransportOptions transportOptions = BigQueryOptions.getDefaultHttpTransportOptions();
transportOptions =
transportOptions.toBuilder()
.setConnectTimeout(connectTimeout)
.setReadTimeout(connectTimeout)
.setConnectTimeout(CONNECT_TIMEOUT_IN_MS)
.setReadTimeout(CONNECT_TIMEOUT_IN_MS)
.build();
BigQueryOptions.Builder builder =
bigqueryOptionsBuilder
Expand All @@ -143,21 +143,25 @@ public static RemoteBigQueryHelper create(BigQueryOptions.Builder bigqueryOption
return new RemoteBigQueryHelper(builder.build());
}

// Opt to keep these settings a small as possible to minimize the total test time.
// These values can be adjusted per test case, but these serve as default values.
private static RetrySettings retrySettings() {
double retryDelayMultiplier = 1.0;
double backoffMultiplier = 1.5;
int maxAttempts = 10;
long initialRetryDelay = 250L;
long maxRetryDelay = 30000L;
long totalTimeOut = 120000L;
long initialRetryDelayMs = 100L; // 0.1s initial retry delay
long maxRetryDelayMs = 1000L; // 1s max retry delay between retry
long initialRpcTimeoutMs = 1000L; // 1s initial rpc duration
long maxRpcTimeoutMs = 2000L; // 2s max rpc duration
long totalTimeoutMs = 3000L; // 3s total timeout
return RetrySettings.newBuilder()
.setMaxAttempts(maxAttempts)
.setMaxRetryDelayDuration(Duration.ofMillis(maxRetryDelay))
.setTotalTimeoutDuration(Duration.ofMillis(totalTimeOut))
.setInitialRetryDelayDuration(Duration.ofMillis(initialRetryDelay))
.setRetryDelayMultiplier(retryDelayMultiplier)
.setInitialRpcTimeoutDuration(Duration.ofMillis(totalTimeOut))
.setRpcTimeoutMultiplier(retryDelayMultiplier)
.setMaxRpcTimeoutDuration(Duration.ofMillis(totalTimeOut))
.setTotalTimeoutDuration(Duration.ofMillis(totalTimeoutMs))
.setInitialRetryDelayDuration(Duration.ofMillis(initialRetryDelayMs))
.setMaxRetryDelayDuration(Duration.ofMillis(maxRetryDelayMs))
.setRetryDelayMultiplier(backoffMultiplier)
.setInitialRpcTimeoutDuration(Duration.ofMillis(initialRpcTimeoutMs))
.setMaxRpcTimeoutDuration(Duration.ofMillis(maxRpcTimeoutMs))
.setRpcTimeoutMultiplier(backoffMultiplier)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ class ITBigQueryTest {
Field.newBuilder("BooleanField", LegacySQLTypeName.BOOLEAN)
.setDescription("BooleanDescription")
.build();
private static final Schema DDL_TABLE_SCHEMA =
private static final Schema SIMPLE_TABLE_SCHEMA =
Schema.of(DDL_TIMESTAMP_FIELD_SCHEMA, DDL_STRING_FIELD_SCHEMA, DDL_BOOLEAN_FIELD_SCHEMA);
private static final Schema LARGE_TABLE_SCHEMA =
Schema.of(
Expand Down Expand Up @@ -622,7 +622,7 @@ class ITBigQueryTest {
private static final String EXTRACT_MODEL_FILE = "extract_model.csv";
private static final String BUCKET = RemoteStorageHelper.generateBucketName();
private static final TableId TABLE_ID = TableId.of(DATASET, generateTableName("testing_table"));
private static final TableId TABLE_ID_DDL =
private static final TableId TABLE_ID_SIMPLE =
TableId.of(DATASET, generateTableName("ddl_testing_table"));
private static final TableId TABLE_ID_FAST_QUERY =
TableId.of(DATASET, generateTableName("fast_query_testing_table"));
Expand Down Expand Up @@ -1163,9 +1163,11 @@ static void beforeClass() throws InterruptedException, IOException {

LoadJobConfiguration configurationDDL =
LoadJobConfiguration.newBuilder(
TABLE_ID_DDL, "gs://" + BUCKET + "/" + JSON_LOAD_FILE_SIMPLE, FormatOptions.json())
TABLE_ID_SIMPLE,
"gs://" + BUCKET + "/" + JSON_LOAD_FILE_SIMPLE,
FormatOptions.json())
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(DDL_TABLE_SCHEMA)
.setSchema(SIMPLE_TABLE_SCHEMA)
.setLabels(labels)
.build();
Job jobDDL = bigquery.create(JobInfo.of(configurationDDL));
Expand Down Expand Up @@ -4773,24 +4775,41 @@ void testFastSQLQueryMultiPage() throws InterruptedException {

@Test
void testFastDMLQuery() throws InterruptedException {
String tableName = TABLE_ID_FAST_QUERY.getTable();
// The test runs an update query. Clone the table to ensure that this doesn't impact
// other tests.
String tableName = generateTableName("test_table_fast_query_dml");
String tableNameFastQuery = TABLE_ID_SIMPLE.getTable();
String ddlQuery =
String.format(
"CREATE OR REPLACE TABLE %s ("
+ "TimestampField TIMESTAMP OPTIONS(description='TimestampDescription'), "
+ "StringField STRING OPTIONS(description='StringDescription'), "
+ "BooleanField BOOLEAN OPTIONS(description='BooleanDescription') "
+ ") AS SELECT DISTINCT * FROM %s",
tableName, tableNameFastQuery);
QueryJobConfiguration ddlConfig =
QueryJobConfiguration.newBuilder(ddlQuery).setDefaultDataset(DatasetId.of(DATASET)).build();
TableResult result = bigquery.query(ddlConfig);
assertNotNull(result.getJobId());

String dmlQuery =
String.format("UPDATE %s.%s SET StringField = 'hello' WHERE TRUE", DATASET, tableName);
QueryJobConfiguration dmlConfig = QueryJobConfiguration.newBuilder(dmlQuery).build();
TableResult result = bigquery.query(dmlConfig);
assertNotNull(result.getJobId());
assertEquals(TABLE_SCHEMA, result.getSchema());
TableResult resultAfterDML = bigquery.query(dmlConfig);
assertNotNull(resultAfterDML.getJobId());
assertEquals(SIMPLE_TABLE_SCHEMA, resultAfterDML.getSchema());
// Using the job reference on the TableResult, lookup and verify DML statistics.
Job queryJob = bigquery.getJob(result.getJobId());
Job queryJob = bigquery.getJob(resultAfterDML.getJobId());
queryJob = queryJob.waitFor();
JobStatistics.QueryStatistics statistics = queryJob.getStatistics();
assertEquals(2L, statistics.getNumDmlAffectedRows().longValue());
assertEquals(2L, statistics.getDmlStats().getUpdatedRowCount().longValue());
assertEquals(1L, statistics.getNumDmlAffectedRows().longValue());
assertEquals(1L, statistics.getDmlStats().getUpdatedRowCount().longValue());
}

@Test
void testFastDDLQuery() throws InterruptedException {
String tableName = "test_table_fast_query_ddl";
String tableNameFastQuery = TABLE_ID_DDL.getTable();
String tableName = generateTableName("test_table_fast_query_ddl");
String tableNameFastQuery = TABLE_ID_SIMPLE.getTable();
String ddlQuery =
String.format(
"CREATE OR REPLACE TABLE %s ("
Expand All @@ -4803,7 +4822,7 @@ void testFastDDLQuery() throws InterruptedException {
QueryJobConfiguration.newBuilder(ddlQuery).setDefaultDataset(DatasetId.of(DATASET)).build();
TableResult result = bigquery.query(ddlConfig);
assertNotNull(result.getJobId());
assertEquals(DDL_TABLE_SCHEMA, result.getSchema());
assertEquals(SIMPLE_TABLE_SCHEMA, result.getSchema());
assertEquals(0, result.getTotalRows());
// Verify correctness of table content
String sqlQuery = String.format("SELECT * FROM %s.%s", DATASET, tableName);
Expand Down Expand Up @@ -5078,29 +5097,61 @@ void testExecuteSelectSessionSupport() throws BigQuerySQLException {

@Test
void testDmlStatistics() throws InterruptedException {
String tableName = TABLE_ID_FAST_QUERY.getTable();
// Run a DML statement to UPDATE 2 rows of data
// This runs an update SQL query. Clone the table to ensure that this doesn't impact
// other tests.
String tableName = generateTableName("test_table_dml_stats");
String tableNameSimple = TABLE_ID_SIMPLE.getTable();
String ddlQuery =
String.format(
"CREATE OR REPLACE TABLE %s ("
+ "TimestampField TIMESTAMP OPTIONS(description='TimestampDescription'), "
+ "StringField STRING OPTIONS(description='StringDescription'), "
+ "BooleanField BOOLEAN OPTIONS(description='BooleanDescription') "
+ ") AS SELECT DISTINCT * FROM %s",
tableName, tableNameSimple);
QueryJobConfiguration ddlConfig =
QueryJobConfiguration.newBuilder(ddlQuery).setDefaultDataset(DatasetId.of(DATASET)).build();
TableResult result = bigquery.query(ddlConfig);
assertNotNull(result.getJobId());

String dmlQuery =
String.format("UPDATE %s.%s SET StringField = 'hello' WHERE TRUE", DATASET, tableName);
QueryJobConfiguration dmlConfig = QueryJobConfiguration.newBuilder(dmlQuery).build();
Job remoteJob = bigquery.create(JobInfo.of(dmlConfig));
remoteJob = remoteJob.waitFor();
assertNull(remoteJob.getStatus().getError());

TableResult result = remoteJob.getQueryResults();
assertNotNull(result.getJobId());
assertEquals(TABLE_SCHEMA, result.getSchema());
TableResult resultAfterUpdate = remoteJob.getQueryResults();
assertNotNull(resultAfterUpdate.getJobId());
assertEquals(SIMPLE_TABLE_SCHEMA, resultAfterUpdate.getSchema());

Job queryJob = bigquery.getJob(remoteJob.getJobId());
queryJob = queryJob.waitFor();
JobStatistics.QueryStatistics statistics = queryJob.getStatistics();
assertEquals(2L, statistics.getNumDmlAffectedRows().longValue());
assertEquals(2L, statistics.getDmlStats().getUpdatedRowCount().longValue());
assertEquals(1L, statistics.getNumDmlAffectedRows().longValue());
assertEquals(1L, statistics.getDmlStats().getUpdatedRowCount().longValue());
}

/* TODO(prasmish): replicate the entire test case for executeSelect */
@Test
void testTransactionInfo() throws InterruptedException {
String tableName = TABLE_ID_FAST_QUERY.getTable();
// The transaction runs an update query. Clone the table to ensure that this doesn't impact
// other tests.
String tableName = generateTableName("test_table_transaction_info");
String tableNameSimple = TABLE_ID_SIMPLE.getTable();
String ddlQuery =
String.format(
"CREATE OR REPLACE TABLE %s ("
+ "TimestampField TIMESTAMP OPTIONS(description='TimestampDescription'), "
+ "StringField STRING OPTIONS(description='StringDescription'), "
+ "BooleanField BOOLEAN OPTIONS(description='BooleanDescription') "
+ ") AS SELECT DISTINCT * FROM %s",
tableName, tableNameSimple);
QueryJobConfiguration ddlConfig =
QueryJobConfiguration.newBuilder(ddlQuery).setDefaultDataset(DatasetId.of(DATASET)).build();
TableResult result = bigquery.query(ddlConfig);
assertNotNull(result.getJobId());

String transaction =
String.format(
"BEGIN TRANSACTION;\n"
Expand Down Expand Up @@ -5913,7 +5964,7 @@ void testCopyJobStatistics() throws InterruptedException, TimeoutException {
@Test
void testSnapshotTableCopyJob() throws InterruptedException {
String sourceTableName = "test_copy_job_base_table";
String ddlTableName = TABLE_ID_DDL.getTable();
String ddlTableName = TABLE_ID_SIMPLE.getTable();
// this creates a snapshot table at specified snapshotTime
String snapshotTableName = "test_snapshot_table";
// Create source table with some data in it
Expand All @@ -5930,7 +5981,7 @@ void testSnapshotTableCopyJob() throws InterruptedException {
TableId sourceTableId = TableId.of(DATASET, sourceTableName);
TableResult result = bigquery.query(ddlConfig);
assertNotNull(result.getJobId());
assertEquals(DDL_TABLE_SCHEMA, result.getSchema());
assertEquals(SIMPLE_TABLE_SCHEMA, result.getSchema());
Table remoteTable = bigquery.getTable(DATASET, sourceTableName);
assertNotNull(remoteTable);

Expand All @@ -5952,7 +6003,7 @@ void testSnapshotTableCopyJob() throws InterruptedException {
assertEquals(snapshotTableId.getDataset(), snapshotTable.getTableId().getDataset());
assertEquals(snapshotTableName, snapshotTable.getTableId().getTable());
assertTrue(snapshotTable.getDefinition() instanceof SnapshotTableDefinition);
assertEquals(DDL_TABLE_SCHEMA, snapshotTable.getDefinition().getSchema());
assertEquals(SIMPLE_TABLE_SCHEMA, snapshotTable.getDefinition().getSchema());
assertNotNull(((SnapshotTableDefinition) snapshotTable.getDefinition()).getSnapshotTime());
assertEquals(
sourceTableName,
Expand All @@ -5978,7 +6029,7 @@ void testSnapshotTableCopyJob() throws InterruptedException {
assertNotNull(restoredTable);
assertEquals(restoredTableId.getDataset(), restoredTable.getTableId().getDataset());
assertEquals(restoredTableName, restoredTable.getTableId().getTable());
assertEquals(DDL_TABLE_SCHEMA, restoredTable.getDefinition().getSchema());
assertEquals(SIMPLE_TABLE_SCHEMA, restoredTable.getDefinition().getSchema());
assertEquals(snapshotTable.getNumBytes(), restoredTable.getNumBytes());
assertEquals(snapshotTable.getNumRows(), restoredTable.getNumRows());

Expand Down Expand Up @@ -6857,7 +6908,7 @@ void testCreateExternalTableWithReferenceFileSchemaParquet() {
@Test
void testCloneTableCopyJob() throws InterruptedException {
String sourceTableName = "test_copy_job_base_table";
String ddlTableName = TABLE_ID_DDL.getTable();
String ddlTableName = TABLE_ID_SIMPLE.getTable();
String cloneTableName = "test_clone_table";
// Create source table with some data in it
String ddlQuery =
Expand All @@ -6873,7 +6924,7 @@ void testCloneTableCopyJob() throws InterruptedException {
TableId sourceTableId = TableId.of(DATASET, sourceTableName);
TableResult result = bigquery.query(ddlConfig);
assertNotNull(result.getJobId());
assertEquals(DDL_TABLE_SCHEMA, result.getSchema());
assertEquals(SIMPLE_TABLE_SCHEMA, result.getSchema());
Table remoteTable = bigquery.getTable(DATASET, sourceTableName);
assertNotNull(remoteTable);

Expand All @@ -6897,7 +6948,7 @@ void testCloneTableCopyJob() throws InterruptedException {
assertEquals(cloneTableName, cloneTable.getTableId().getTable());
assertEquals(TableDefinition.Type.TABLE, cloneTable.getDefinition().getType());
assertTrue(cloneTable.getDefinition() instanceof StandardTableDefinition);
assertEquals(DDL_TABLE_SCHEMA, cloneTable.getDefinition().getSchema());
assertEquals(SIMPLE_TABLE_SCHEMA, cloneTable.getDefinition().getSchema());
assertTrue(cloneTable.getCloneDefinition() instanceof CloneDefinition);
assertEquals(sourceTableName, cloneTable.getCloneDefinition().getBaseTableId().getTable());
assertNotNull(cloneTable.getCloneDefinition().getCloneTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class RemoteBigQueryHelperTest {
class RemoteBigQueryHelperTest {

private static final String DATASET_NAME = "dataset-name";
private static final String PROJECT_ID = "project-id";
Expand Down Expand Up @@ -67,7 +66,7 @@ public class RemoteBigQueryHelperTest {
private static final InputStream JSON_KEY_STREAM = new ByteArrayInputStream(JSON_KEY.getBytes());

@Test
public void testForceDelete() throws InterruptedException, ExecutionException {
void testForceDelete() {
BigQuery bigqueryMock = Mockito.mock(BigQuery.class);
Mockito.when(bigqueryMock.delete(DATASET_NAME, DatasetDeleteOption.deleteContents()))
.thenReturn(true);
Expand All @@ -76,15 +75,15 @@ public void testForceDelete() throws InterruptedException, ExecutionException {
}

@Test
public void testCreateFromStream() {
void testCreateFromStream() {
RemoteBigQueryHelper helper = RemoteBigQueryHelper.create(PROJECT_ID, JSON_KEY_STREAM);
BigQueryOptions options = helper.getOptions();
assertEquals(PROJECT_ID, options.getProjectId());
assertEquals(60000, ((HttpTransportOptions) options.getTransportOptions()).getConnectTimeout());
assertEquals(60000, ((HttpTransportOptions) options.getTransportOptions()).getReadTimeout());
assertEquals(10, options.getRetrySettings().getMaxAttempts());
assertEquals(Duration.ofMillis(30000), options.getRetrySettings().getMaxRetryDelayDuration());
assertEquals(Duration.ofMillis(120000), options.getRetrySettings().getTotalTimeoutDuration());
assertEquals(Duration.ofMillis(250), options.getRetrySettings().getInitialRetryDelayDuration());
assertEquals(Duration.ofMillis(1000), options.getRetrySettings().getMaxRetryDelayDuration());
assertEquals(Duration.ofMillis(3000), options.getRetrySettings().getTotalTimeoutDuration());
assertEquals(Duration.ofMillis(100), options.getRetrySettings().getInitialRetryDelayDuration());
}
}
Loading