From b15f71e9d9d7fead804b1064c82e070f70660708 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 15 Dec 2025 18:32:55 -0500 Subject: [PATCH 1/8] test: Add ITs for timestamps for Read API --- .../it/ITBigQueryStorageReadClientTest.java | 209 +++++++++++++----- .../storage/v1/it/SimpleRowReaderArrow.java | 32 ++- 2 files changed, 178 insertions(+), 63 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java index d026210b09..c468bde8d1 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -123,6 +124,8 @@ public class ITBigQueryStorageReadClientTest { Logger.getLogger(ITBigQueryStorageReadClientTest.class.getName()); private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String DESCRIPTION = "BigQuery Storage Java client test dataset"; + private static final int SHAKESPEARE_SAMPLE_ROW_COUNT = 164_656; + private static final int MAX_STREAM_COUNT = 1; private static BigQueryReadClient client; private static String projectName; @@ -270,7 +273,7 @@ public class ITBigQueryStorageReadClientTest { .build()) .build(); - private static final ImmutableMap RANGE_TEST_VALUES_DATES = + private static final Map RANGE_TEST_VALUES_DATES = new ImmutableMap.Builder() .put( "bounded", @@ -303,7 +306,7 @@ public class ITBigQueryStorageReadClientTest { .build(); // dates are returned as days since epoch - private static final ImmutableMap RANGE_TEST_VALUES_EXPECTED_DATES = + private static final Map RANGE_TEST_VALUES_EXPECTED_DATES = new ImmutableMap.Builder() .put( "bounded", @@ -562,7 +565,7 @@ public void testSimpleReadAvro() { rowCount += response.getRowCount(); } - assertEquals(164_656, rowCount); + assertEquals(SHAKESPEARE_SAMPLE_ROW_COUNT, rowCount); } @Test @@ -608,7 +611,7 @@ public void testSimpleReadArrow() { Preconditions.checkState(response.hasArrowRecordBatch()); rowCount += response.getRowCount(); } - assertEquals(164_656, rowCount); + assertEquals(SHAKESPEARE_SAMPLE_ROW_COUNT, rowCount); } @Test @@ -799,6 +802,105 @@ public void testRangeTypeWrite() } } + @Test + public void timestamp_readArrow() + throws InterruptedException, IOException, DescriptorValidationException { + com.google.cloud.bigquery.Schema timestampSchema = + com.google.cloud.bigquery.Schema.of( + Field.newBuilder("timestamp", StandardSQLTypeName.TIMESTAMP) + .setMode(Mode.NULLABLE) + .build()); + + TableSchema timestampTableSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder() + .setName("timestamp") + .setType(TableFieldSchema.Type.TIMESTAMP) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .build(); + + // Create table with Range fields. + String tableName = "test_timestamp_read_arrow"; + TableId tableId = TableId.of(DATASET, tableName); + bigquery.create(TableInfo.of(tableId, StandardTableDefinition.of(timestampSchema))); + + TableName parentTable = TableName.of(projectName, DATASET, tableName); + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelayDuration(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelayDuration(Duration.ofSeconds(10)) + .build(); + + Long[] timestampsMicros = + new Long[] { + 1735734896123456L, // 2025-01-01T12:34:56.123456Z + 1580646896123456L, // 2020-02-02T12:34:56.123456Z + 636467696123456L, // 1990-03-03T12:34:56.123456Z + 165846896123456L // 1975-04-04T12:34:56.123456Z + }; + + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(parentTable.toString(), timestampTableSchema) + .setRetrySettings(retrySettings) + .build()) { + JSONArray data = new JSONArray(); + for (long timestampMicro : timestampsMicros) { + JSONObject row = new JSONObject(); + row.put("timestamp", timestampMicro); + data.put(row); + } + + ApiFuture future = writer.append(data); + // The append method is asynchronous. Rather than waiting for the method to complete, + // which can hurt performance, register a completion callback and continue streaming. + ApiFutures.addCallback(future, new AppendCompleteCallback(), MoreExecutors.directExecutor()); + } + + String table = BigQueryResource.formatTableResource(projectName, DATASET, tableId.getTable()); + ReadSession session = + client.createReadSession( + parentProjectId, + ReadSession.newBuilder().setTable(table).setDataFormat(DataFormat.ARROW).build(), + MAX_STREAM_COUNT); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession" + + " response:%n%s", + table, session.toString()), + MAX_STREAM_COUNT, + session.getStreamsCount()); + + // Assert that there are streams available in the session. An empty table may not have + // data available. If no sessions are available for an anonymous (cached) table, consider + // writing results of a query to a named table rather than consuming cached results + // directly. + assertThat(session.getStreamsCount()).isGreaterThan(0); + + // Set up a simple reader and start a read session. + try (SimpleRowReaderArrow reader = new SimpleRowReaderArrow(session.getArrowSchema())) { + // Use the first stream to perform reading. + String streamName = session.getStreams(0).getName(); + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).build(); + + long rowCount = 0; + // Process each block of rows as they arrive and decode using our simple row reader. + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasArrowRecordBatch()); + reader.processRows( + response.getArrowRecordBatch(), + new SimpleRowReaderArrow.ArrowTimestampBatchConsumer(Arrays.asList(timestampsMicros))); + rowCount += response.getRowCount(); + } + assertEquals(timestampsMicros.length, rowCount); + } + } + @Test public void testSimpleReadAndResume() { String table = @@ -825,7 +927,7 @@ public void testSimpleReadAndResume() { // We have to read some number of rows in order to be able to resume. More details: - long rowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset= */ 34_846); + long rowCount = readStreamToOffset(session.getStreams(0), /* rowOffset= */ 34_846); ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder() @@ -841,7 +943,7 @@ public void testSimpleReadAndResume() { // Verifies that the number of rows skipped and read equals to the total number of rows in the // table. - assertEquals(164_656, rowCount); + assertEquals(SHAKESPEARE_SAMPLE_ROW_COUNT, rowCount); } @Test @@ -994,11 +1096,11 @@ public void testReadAtSnapshot() throws InterruptedException, IOException { bigquery.create(TableInfo.of(testTableId, StandardTableDefinition.of(tableSchema))); Job firstJob = - RunQueryAppendJobAndExpectSuccess( + runQueryAppendJobAndExpectSuccess( /* destinationTableId= */ testTableId, /* query= */ "SELECT 1 AS col"); Job secondJob = - RunQueryAppendJobAndExpectSuccess( + runQueryAppendJobAndExpectSuccess( /* destinationTableId= */ testTableId, /* query= */ "SELECT 2 AS col"); String table = @@ -1008,7 +1110,7 @@ public void testReadAtSnapshot() throws InterruptedException, IOException { /* tableId= */ testTableId.getTable()); final List rowsAfterFirstSnapshot = new ArrayList<>(); - ProcessRowsAtSnapshot( + processRowsAtSnapshot( /* table= */ table, /* snapshotInMillis= */ firstJob.getStatistics().getEndTime(), /* filter= */ null, @@ -1021,7 +1123,7 @@ public void accept(GenericData.Record record) { assertEquals(Collections.singletonList(1L), rowsAfterFirstSnapshot); final List rowsAfterSecondSnapshot = new ArrayList<>(); - ProcessRowsAtSnapshot( + processRowsAtSnapshot( /* table= */ table, /* snapshotInMillis= */ secondJob.getStatistics().getEndTime(), /* filter= */ null, @@ -1053,7 +1155,7 @@ public void testColumnPartitionedTableByDateField() throws InterruptedException, + " SELECT 3, CAST(\"2019-01-03\" AS DATE)", DATASET, partitionedTableName); - RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); + runQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); String table = BigQueryResource.formatTableResource( @@ -1061,11 +1163,11 @@ public void testColumnPartitionedTableByDateField() throws InterruptedException, /* datasetId= */ DATASET, /* tableId= */ partitionedTableName); - List unfilteredRows = ReadAllRows(/* table= */ table, /* filter= */ null); + List unfilteredRows = readAllRows(/* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + unfilteredRows.toString(), 3, unfilteredRows.size()); List partitionFilteredRows = - ReadAllRows(/* table= */ table, /* filter= */ "date_field = CAST(\"2019-01-02\" AS DATE)"); + readAllRows(/* table= */ table, /* filter= */ "date_field = CAST(\"2019-01-02\" AS DATE)"); assertEquals( "Actual rows read: " + partitionFilteredRows.toString(), 1, partitionFilteredRows.size()); assertEquals(2L, partitionFilteredRows.get(0).get("num_field")); @@ -1092,13 +1194,13 @@ public void testIngestionTimePartitionedTable() throws InterruptedException, IOE .build())); // Simulate ingestion for 2019-01-01. - RunQueryAppendJobAndExpectSuccess( + runQueryAppendJobAndExpectSuccess( /* destinationTableId= */ TableId.of( /* dataset= */ DATASET, /* table= */ testTableId.getTable() + "$20190101"), /* query= */ "SELECT 1 AS num_field"); // Simulate ingestion for 2019-01-02. - RunQueryAppendJobAndExpectSuccess( + runQueryAppendJobAndExpectSuccess( /* destinationTableId= */ TableId.of( /* dataset= */ DATASET, /* table= */ testTableId.getTable() + "$20190102"), /* query= */ "SELECT 2 AS num_field"); @@ -1109,11 +1211,11 @@ public void testIngestionTimePartitionedTable() throws InterruptedException, IOE /* datasetId= */ testTableId.getDataset(), /* tableId= */ testTableId.getTable()); - List unfilteredRows = ReadAllRows(/* table= */ table, /* filter= */ null); + List unfilteredRows = readAllRows(/* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + unfilteredRows.toString(), 2, unfilteredRows.size()); List partitionFilteredRows = - ReadAllRows(/* table= */ table, /* filter= */ "_PARTITIONDATE > \"2019-01-01\""); + readAllRows(/* table= */ table, /* filter= */ "_PARTITIONDATE > \"2019-01-01\""); assertEquals( "Actual rows read: " + partitionFilteredRows.toString(), 1, partitionFilteredRows.size()); assertEquals(2L, partitionFilteredRows.get(0).get("num_field")); @@ -1144,13 +1246,13 @@ public void testBasicSqlTypes() throws InterruptedException, IOException { + " b\"абвгд\"", DATASET, tableName); - RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); + runQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); String table = BigQueryResource.formatTableResource( /* projectId= */ projectName, /* datasetId= */ DATASET, /* tableId= */ tableName); - List rows = ReadAllRows(/* table= */ table, /* filter= */ null); + List rows = readAllRows(/* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -1241,13 +1343,13 @@ public void testDateAndTimeSqlTypes() throws InterruptedException, IOException { + " CAST(\"2019-04-30 19:24:19.123456 UTC\" AS TIMESTAMP)", DATASET, tableName); - RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); + runQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); String table = BigQueryResource.formatTableResource( /* projectId= */ projectName, /* datasetId= */ DATASET, /* tableId= */ tableName); - List rows = ReadAllRows(/* table= */ table, /* filter= */ null); + List rows = readAllRows(/* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -1336,13 +1438,13 @@ public void testGeographySqlType() throws InterruptedException, IOException { + " SELECT ST_GEOGPOINT(1.1, 2.2)", DATASET, tableName); - RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); + runQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); String table = BigQueryResource.formatTableResource( /* projectId= */ projectName, /* datasetId= */ DATASET, /* tableId= */ tableName); - List rows = ReadAllRows(/* table= */ table, /* filter= */ null); + List rows = readAllRows(/* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -1379,13 +1481,13 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio + " (10, 'abc')", DATASET, tableName); - RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); + runQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); String table = BigQueryResource.formatTableResource( /* projectId= */ projectName, /* datasetId= */ DATASET, /* tableId= */ tableName); - List rows = ReadAllRows(/* table= */ table, /* filter= */ null); + List rows = readAllRows(/* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -1432,8 +1534,7 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio } @Test - public void testSimpleReadWithBackgroundExecutorProvider() - throws IOException, InterruptedException { + public void testSimpleReadWithBackgroundExecutorProvider() throws IOException { BigQueryReadSettings bigQueryReadSettings = BigQueryReadSettings.newBuilder() .setBackgroundExecutorProvider( @@ -1480,7 +1581,7 @@ public void testSimpleReadWithBackgroundExecutorProvider() rowCount += response.getRowCount(); } - assertEquals(164_656, rowCount); + assertEquals(SHAKESPEARE_SAMPLE_ROW_COUNT, rowCount); } @Test @@ -1534,22 +1635,21 @@ public void testInvalidUniverseDomainWithMismatchCredentials() throws IOExceptio /* datasetId= */ "samples", /* tableId= */ "shakespeare"); - try { - ReadSession session = - localClient.createReadSession( - /* parent= */ parentProjectId, - /* readSession= */ ReadSession.newBuilder() - .setTable(table) - .setDataFormat(DataFormat.AVRO) - .build(), - /* maxStreamCount= */ 1); - fail("RPCs to invalid universe domain should fail"); - } catch (UnauthenticatedException e) { - assertThat( - (e.getMessage() - .contains("does not match the universe domain found in the credentials"))) - .isTrue(); - } + UnauthenticatedException e = + assertThrows( + "RPCs to invalid universe domain should fail", + UnauthenticatedException.class, + () -> { + localClient.createReadSession( + /* parent= */ parentProjectId, + /* readSession= */ ReadSession.newBuilder() + .setTable(table) + .setDataFormat(DataFormat.AVRO) + .build(), + /* maxStreamCount= */ 1); + }); + assertTrue( + e.getMessage().contains("does not match the universe domain found in the credentials")); localClient.close(); } @@ -1584,7 +1684,7 @@ public void testUniverseDomainWithMatchingDomain() throws IOException { rowCount += response.getRowCount(); } - assertEquals(164_656, rowCount); + assertEquals(SHAKESPEARE_SAMPLE_ROW_COUNT, rowCount); localClient.close(); } @@ -1650,9 +1750,9 @@ public void testSimpleReadWithOtelTracing() throws IOException { createReadSessionMap.get( AttributeKey.longKey("bq.storage.read_session.request.max_stream_count"))); assertEquals( + 1L, createReadSessionMap.get( - AttributeKey.longKey("bq.storage.read_session.request.max_stream_count")), - 1L); + AttributeKey.longKey("bq.storage.read_session.request.max_stream_count"))); } public void testUniverseDomain() throws IOException { @@ -1699,8 +1799,7 @@ public void testUniverseDomain() throws IOException { * @param rowOffset * @return the number of requested rows to skip or the total rows read if stream had less rows. */ - private long ReadStreamToOffset(ReadStream readStream, long rowOffset) { - + private long readStreamToOffset(ReadStream readStream, long rowOffset) { ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadStream(readStream.getName()).build(); @@ -1728,7 +1827,7 @@ private long ReadStreamToOffset(ReadStream readStream, long rowOffset) { * @param consumer that receives all Avro rows. * @throws IOException */ - private void ProcessRowsAtSnapshot( + private void processRowsAtSnapshot( String table, Long snapshotInMillis, String filter, AvroRowConsumer consumer) throws IOException { Preconditions.checkNotNull(table); @@ -1787,9 +1886,9 @@ private void ProcessRowsAtSnapshot( * @param filter Optional. If specified, it will be used to restrict returned data. * @return */ - List ReadAllRows(String table, String filter) throws IOException { + List readAllRows(String table, String filter) throws IOException { final List rows = new ArrayList<>(); - ProcessRowsAtSnapshot( + processRowsAtSnapshot( /* table= */ table, /* snapshotInMillis= */ null, /* filter= */ filter, @@ -1812,9 +1911,9 @@ public void accept(GenericData.Record record) { * @return * @throws InterruptedException */ - private Job RunQueryAppendJobAndExpectSuccess(TableId destinationTableId, String query) + private Job runQueryAppendJobAndExpectSuccess(TableId destinationTableId, String query) throws InterruptedException { - return RunQueryJobAndExpectSuccess( + return runQueryJobAndExpectSuccess( QueryJobConfiguration.newBuilder(query) .setDestinationTable(destinationTableId) .setUseQueryCache(false) @@ -1830,7 +1929,7 @@ private Job RunQueryAppendJobAndExpectSuccess(TableId destinationTableId, String * @return * @throws InterruptedException */ - private Job RunQueryJobAndExpectSuccess(QueryJobConfiguration configuration) + private Job runQueryJobAndExpectSuccess(QueryJobConfiguration configuration) throws InterruptedException { Job job = bigquery.create(JobInfo.of(configuration)); Job completedJob = diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java index 685f72fbc9..7ccdf4526d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java @@ -23,7 +23,6 @@ import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch; import com.google.cloud.bigquery.storage.v1.ArrowSchema; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.time.LocalDateTime; import java.util.ArrayList; @@ -50,17 +49,34 @@ public interface ArrowBatchConsumer { void accept(VectorSchemaRoot root); } + static class ArrowTimestampBatchConsumer implements ArrowBatchConsumer { + private final List expectedTimestampValues; + + ArrowTimestampBatchConsumer(List expectedTimestampValues) { + this.expectedTimestampValues = expectedTimestampValues; + } + + @Override + public void accept(VectorSchemaRoot root) { + FieldVector timestampFieldVector = root.getVector("timestamp"); + int count = timestampFieldVector.getValueCount(); + for (int i = 0; i < count; i++) { + long value = (Long) timestampFieldVector.getObject(i); + assertThat(value).isEqualTo(expectedTimestampValues.get(i)); + } + } + } + /** ArrowRangeBatchConsumer accepts batch Arrow data and validate the range values. */ public static class ArrowRangeBatchConsumer implements ArrowBatchConsumer { - - private final ImmutableMap expectedRangeDateValues; - private final ImmutableMap expectedRangeDatetimeValues; - private final ImmutableMap expectedRangeTimestampValues; + private final Map expectedRangeDateValues; + private final Map expectedRangeDatetimeValues; + private final Map expectedRangeTimestampValues; public ArrowRangeBatchConsumer( - ImmutableMap expectedRangeDateValues, - ImmutableMap expectedRangeDatetimeValues, - ImmutableMap expectedRangeTimestampValues) { + Map expectedRangeDateValues, + Map expectedRangeDatetimeValues, + Map expectedRangeTimestampValues) { this.expectedRangeDateValues = expectedRangeDateValues; this.expectedRangeDatetimeValues = expectedRangeDatetimeValues; this.expectedRangeTimestampValues = expectedRangeTimestampValues; From 987a0e74a01a733577101d23ccdcbc68e03632d3 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 15 Dec 2025 20:11:30 -0500 Subject: [PATCH 2/8] chore: Add timestamp test for avro --- .../it/ITBigQueryStorageReadClientTest.java | 154 ++++++++++-------- 1 file changed, 86 insertions(+), 68 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java index c468bde8d1..e09a11b379 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java @@ -106,6 +106,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; +import java.util.stream.Collectors; import org.apache.avro.Conversions; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -126,6 +127,7 @@ public class ITBigQueryStorageReadClientTest { private static final String DESCRIPTION = "BigQuery Storage Java client test dataset"; private static final int SHAKESPEARE_SAMPLE_ROW_COUNT = 164_656; private static final int MAX_STREAM_COUNT = 1; + private static final String BQSTORAGE_TIMESTAMP_READ_TABLE = "bqstorage_timestamp_read"; private static BigQueryReadClient client; private static String projectName; @@ -210,6 +212,14 @@ public class ITBigQueryStorageReadClientTest { + " \"universe_domain\": \"fake.domain\"\n" + "}"; + private static final Long[] EXPECTED_TIMESTAMPS_MICROS = + new Long[] { + 1735734896123456L, // 2025-01-01T12:34:56.123456Z + 1580646896123456L, // 2020-02-02T12:34:56.123456Z + 636467696123456L, // 1990-03-03T12:34:56.123456Z + 165846896123456L // 1975-04-04T12:34:56.123456Z + }; + private static final com.google.cloud.bigquery.Schema RANGE_SCHEMA = com.google.cloud.bigquery.Schema.of( Field.newBuilder("name", StandardSQLTypeName.STRING) @@ -499,7 +509,8 @@ public CompletableResultCode shutdown() { } @BeforeClass - public static void beforeClass() throws IOException { + public static void beforeClass() + throws IOException, DescriptorValidationException, InterruptedException { client = BigQueryReadClient.create(); projectName = ServiceOptions.getDefaultProjectId(); parentProjectId = String.format("projects/%s", projectName); @@ -517,6 +528,57 @@ public static void beforeClass() throws IOException { .build(); bigquery.create(datasetInfo); LOG.info("Created test dataset: " + DATASET); + + setupTimestampTable(); + } + + private static void setupTimestampTable() + throws DescriptorValidationException, IOException, InterruptedException { + com.google.cloud.bigquery.Schema timestampSchema = + com.google.cloud.bigquery.Schema.of( + Field.newBuilder("timestamp", StandardSQLTypeName.TIMESTAMP) + .setMode(Mode.NULLABLE) + .build()); + + TableSchema timestampTableSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder() + .setName("timestamp") + .setType(TableFieldSchema.Type.TIMESTAMP) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .build(); + + // Create table with Range fields. + TableId tableId = TableId.of(DATASET, BQSTORAGE_TIMESTAMP_READ_TABLE); + bigquery.create(TableInfo.of(tableId, StandardTableDefinition.of(timestampSchema))); + + TableName parentTable = TableName.of(projectName, DATASET, BQSTORAGE_TIMESTAMP_READ_TABLE); + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelayDuration(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelayDuration(Duration.ofSeconds(10)) + .build(); + + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(parentTable.toString(), timestampTableSchema) + .setRetrySettings(retrySettings) + .build()) { + JSONArray data = new JSONArray(); + for (long timestampMicro : EXPECTED_TIMESTAMPS_MICROS) { + JSONObject row = new JSONObject(); + row.put("timestamp", timestampMicro); + data.put(row); + } + + ApiFuture future = writer.append(data); + // The append method is asynchronous. Rather than waiting for the method to complete, + // which can hurt performance, register a completion callback and continue streaming. + ApiFutures.addCallback(future, new AppendCompleteCallback(), MoreExecutors.directExecutor()); + } } @AfterClass @@ -803,64 +865,9 @@ public void testRangeTypeWrite() } @Test - public void timestamp_readArrow() - throws InterruptedException, IOException, DescriptorValidationException { - com.google.cloud.bigquery.Schema timestampSchema = - com.google.cloud.bigquery.Schema.of( - Field.newBuilder("timestamp", StandardSQLTypeName.TIMESTAMP) - .setMode(Mode.NULLABLE) - .build()); - - TableSchema timestampTableSchema = - TableSchema.newBuilder() - .addFields( - TableFieldSchema.newBuilder() - .setName("timestamp") - .setType(TableFieldSchema.Type.TIMESTAMP) - .setMode(TableFieldSchema.Mode.NULLABLE) - .build()) - .build(); - - // Create table with Range fields. - String tableName = "test_timestamp_read_arrow"; - TableId tableId = TableId.of(DATASET, tableName); - bigquery.create(TableInfo.of(tableId, StandardTableDefinition.of(timestampSchema))); - - TableName parentTable = TableName.of(projectName, DATASET, tableName); - RetrySettings retrySettings = - RetrySettings.newBuilder() - .setInitialRetryDelayDuration(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.1) - .setMaxAttempts(5) - .setMaxRetryDelayDuration(Duration.ofSeconds(10)) - .build(); - - Long[] timestampsMicros = - new Long[] { - 1735734896123456L, // 2025-01-01T12:34:56.123456Z - 1580646896123456L, // 2020-02-02T12:34:56.123456Z - 636467696123456L, // 1990-03-03T12:34:56.123456Z - 165846896123456L // 1975-04-04T12:34:56.123456Z - }; - - try (JsonStreamWriter writer = - JsonStreamWriter.newBuilder(parentTable.toString(), timestampTableSchema) - .setRetrySettings(retrySettings) - .build()) { - JSONArray data = new JSONArray(); - for (long timestampMicro : timestampsMicros) { - JSONObject row = new JSONObject(); - row.put("timestamp", timestampMicro); - data.put(row); - } - - ApiFuture future = writer.append(data); - // The append method is asynchronous. Rather than waiting for the method to complete, - // which can hurt performance, register a completion callback and continue streaming. - ApiFutures.addCallback(future, new AppendCompleteCallback(), MoreExecutors.directExecutor()); - } - - String table = BigQueryResource.formatTableResource(projectName, DATASET, tableId.getTable()); + public void timestamp_readArrow() throws IOException { + String table = + BigQueryResource.formatTableResource(projectName, DATASET, BQSTORAGE_TIMESTAMP_READ_TABLE); ReadSession session = client.createReadSession( parentProjectId, @@ -894,10 +901,23 @@ public void timestamp_readArrow() Preconditions.checkState(response.hasArrowRecordBatch()); reader.processRows( response.getArrowRecordBatch(), - new SimpleRowReaderArrow.ArrowTimestampBatchConsumer(Arrays.asList(timestampsMicros))); + new SimpleRowReaderArrow.ArrowTimestampBatchConsumer( + Arrays.asList(EXPECTED_TIMESTAMPS_MICROS))); rowCount += response.getRowCount(); } - assertEquals(timestampsMicros.length, rowCount); + assertEquals(EXPECTED_TIMESTAMPS_MICROS.length, rowCount); + } + } + + @Test + public void timestamp_readAvro() throws IOException { + String table = + BigQueryResource.formatTableResource(projectName, DATASET, BQSTORAGE_TIMESTAMP_READ_TABLE); + List rows = readAllRows(table, null); + List timestamps = + rows.stream().map(x -> (Long) x.get("timestamp")).collect(Collectors.toList()); + for (int i = 0; i < timestamps.size(); i++) { + assertEquals(EXPECTED_TIMESTAMPS_MICROS[i], timestamps.get(i)); } } @@ -1892,13 +1912,11 @@ List readAllRows(String table, String filter) throws IOExcep /* table= */ table, /* snapshotInMillis= */ null, /* filter= */ filter, - new AvroRowConsumer() { - @Override - public void accept(GenericData.Record record) { - // clone the record since that reference will be reused by the reader. - rows.add(new GenericRecordBuilder(record).build()); - } - }); + (AvroRowConsumer) + record -> { + // clone the record since that reference will be reused by the reader. + rows.add(new GenericRecordBuilder(record).build()); + }); return rows; } From dbf782ff8940e4e8c46039b45920426cb722dc8f Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 16 Dec 2025 00:03:28 -0500 Subject: [PATCH 3/8] chore: Refactor packages into util package --- .../it/ITBigQueryStorageLongRunningTest.java | 2 + .../it/ITBigQueryStorageReadClientTest.java | 259 ++++++------------ .../v1/it/ITBigQueryWriteClientTest.java | 198 ++++++++++--- .../it/ITBigQueryWriteNonQuotaRetryTest.java | 1 + .../v1/it/ITBigQueryWriteQuotaRetryTest.java | 1 + .../v1/it/{ => util}/BigQueryResource.java | 2 +- .../bigquery/storage/v1/it/util/Helper.java | 167 +++++++++++ .../it/{ => util}/SimpleRowReaderArrow.java | 6 +- .../v1/it/{ => util}/SimpleRowReaderAvro.java | 2 +- .../v1/it/{ => util}/WriteRetryTestUtil.java | 2 +- .../v1beta2/it/ITBigQueryStorageTest.java | 4 +- 11 files changed, 421 insertions(+), 223 deletions(-) rename google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/{ => util}/BigQueryResource.java (95%) create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java rename google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/{ => util}/SimpleRowReaderArrow.java (97%) rename google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/{ => util}/SimpleRowReaderAvro.java (97%) rename google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/{ => util}/WriteRetryTestUtil.java (99%) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java index 6626a6cb00..1fd39ec5e0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java @@ -36,6 +36,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; + +import com.google.cloud.bigquery.storage.v1.it.util.BigQueryResource; import org.junit.AfterClass; import org.junit.Assume; import org.junit.BeforeClass; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java index e09a11b379..ad827e930d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java @@ -27,14 +27,12 @@ import static org.junit.Assert.fail; import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.UnauthenticatedException; -import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.RetryOption; import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.BigQuery; @@ -63,20 +61,22 @@ import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; import com.google.cloud.bigquery.storage.v1.ReadSession; -import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers; import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions; import com.google.cloud.bigquery.storage.v1.ReadStream; import com.google.cloud.bigquery.storage.v1.TableFieldSchema; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.TableSchema; -import com.google.cloud.bigquery.storage.v1.it.SimpleRowReaderArrow.ArrowRangeBatchConsumer; -import com.google.cloud.bigquery.storage.v1.it.SimpleRowReaderAvro.AvroRowConsumer; +import com.google.cloud.bigquery.storage.v1.it.util.BigQueryResource; +import com.google.cloud.bigquery.storage.v1.it.util.Helper; +import com.google.cloud.bigquery.storage.v1.it.util.SimpleRowReaderArrow; +import com.google.cloud.bigquery.storage.v1.it.util.SimpleRowReaderArrow.ArrowRangeBatchConsumer; +import com.google.cloud.bigquery.storage.v1.it.util.SimpleRowReaderAvro; +import com.google.cloud.bigquery.storage.v1.it.util.SimpleRowReaderAvro.AvroRowConsumer; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Descriptors.DescriptorValidationException; -import com.google.protobuf.Timestamp; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -85,9 +85,7 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.samplers.Sampler; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.Duration; @@ -111,7 +109,6 @@ import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.util.Utf8; import org.json.JSONArray; import org.json.JSONObject; @@ -125,11 +122,14 @@ public class ITBigQueryStorageReadClientTest { Logger.getLogger(ITBigQueryStorageReadClientTest.class.getName()); private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String DESCRIPTION = "BigQuery Storage Java client test dataset"; + private static final String BQSTORAGE_TIMESTAMP_READ_TABLE = "bqstorage_timestamp_read"; + private static final int SHAKESPEARE_SAMPLE_ROW_COUNT = 164_656; + private static final int SHAKESPEARE_SAMPELS_ROWS_MORE_THAN_100_WORDS = 1_333; + private static final int MAX_STREAM_COUNT = 1; - private static final String BQSTORAGE_TIMESTAMP_READ_TABLE = "bqstorage_timestamp_read"; - private static BigQueryReadClient client; + private static BigQueryReadClient readClient; private static String projectName; private static String parentProjectId; private static BigQuery bigquery; @@ -212,14 +212,6 @@ public class ITBigQueryStorageReadClientTest { + " \"universe_domain\": \"fake.domain\"\n" + "}"; - private static final Long[] EXPECTED_TIMESTAMPS_MICROS = - new Long[] { - 1735734896123456L, // 2025-01-01T12:34:56.123456Z - 1580646896123456L, // 2020-02-02T12:34:56.123456Z - 636467696123456L, // 1990-03-03T12:34:56.123456Z - 165846896123456L // 1975-04-04T12:34:56.123456Z - }; - private static final com.google.cloud.bigquery.Schema RANGE_SCHEMA = com.google.cloud.bigquery.Schema.of( Field.newBuilder("name", StandardSQLTypeName.STRING) @@ -511,7 +503,7 @@ public CompletableResultCode shutdown() { @BeforeClass public static void beforeClass() throws IOException, DescriptorValidationException, InterruptedException { - client = BigQueryReadClient.create(); + readClient = BigQueryReadClient.create(); projectName = ServiceOptions.getDefaultProjectId(); parentProjectId = String.format("projects/%s", projectName); @@ -568,7 +560,7 @@ private static void setupTimestampTable() .setRetrySettings(retrySettings) .build()) { JSONArray data = new JSONArray(); - for (long timestampMicro : EXPECTED_TIMESTAMPS_MICROS) { + for (long timestampMicro : Helper.INPUT_TIMESTAMPS_MICROS) { JSONObject row = new JSONObject(); row.put("timestamp", timestampMicro); data.put(row); @@ -577,15 +569,16 @@ private static void setupTimestampTable() ApiFuture future = writer.append(data); // The append method is asynchronous. Rather than waiting for the method to complete, // which can hurt performance, register a completion callback and continue streaming. - ApiFutures.addCallback(future, new AppendCompleteCallback(), MoreExecutors.directExecutor()); + ApiFutures.addCallback( + future, new Helper.AppendCompleteCallback(), MoreExecutors.directExecutor()); } } @AfterClass public static void afterClass() throws InterruptedException { - if (client != null) { - client.close(); - client.awaitTermination(10, TimeUnit.SECONDS); + if (readClient != null) { + readClient.close(); + readClient.awaitTermination(10, TimeUnit.SECONDS); } if (bigquery != null) { @@ -603,7 +596,7 @@ public void testSimpleReadAvro() { /* tableId= */ "shakespeare"); ReadSession session = - client.createReadSession( + readClient.createReadSession( /* parent= */ parentProjectId, /* readSession= */ ReadSession.newBuilder() .setTable(table) @@ -622,7 +615,7 @@ public void testSimpleReadAvro() { ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); long rowCount = 0; - ServerStream stream = client.readRowsCallable().call(readRowsRequest); + ServerStream stream = readClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { rowCount += response.getRowCount(); } @@ -639,7 +632,7 @@ public void testSimpleReadArrow() { /* tableId= */ "shakespeare"); ReadSession session = - client.createReadSession( + readClient.createReadSession( /* parent= */ parentProjectId, /* readSession= */ ReadSession.newBuilder() .setTable(table) @@ -668,7 +661,7 @@ public void testSimpleReadArrow() { long rowCount = 0; // Process each block of rows as they arrive and decode using our simple row reader. - ServerStream stream = client.readRowsCallable().call(readRowsRequest); + ServerStream stream = readClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { Preconditions.checkState(response.hasArrowRecordBatch()); rowCount += response.getRowCount(); @@ -703,7 +696,7 @@ public void testRangeTypeSimple() throws InterruptedException { /* tableId= */ tableId.getTable()); ReadSession session = - client.createReadSession( + readClient.createReadSession( /* parent= */ parentProjectId, /* readSession= */ ReadSession.newBuilder() .setTable(table) @@ -731,7 +724,7 @@ public void testRangeTypeSimple() throws InterruptedException { ReadRowsRequest.newBuilder().setReadStream(streamName).build(); long rowCount = 0; - ServerStream stream = client.readRowsCallable().call(readRowsRequest); + ServerStream stream = readClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { Preconditions.checkState(response.hasArrowRecordBatch()); rowCount += response.getRowCount(); @@ -802,7 +795,8 @@ public void testRangeTypeWrite() ApiFuture future = writer.append(data); // The append method is asynchronous. Rather than waiting for the method to complete, // which can hurt performance, register a completion callback and continue streaming. - ApiFutures.addCallback(future, new AppendCompleteCallback(), MoreExecutors.directExecutor()); + ApiFutures.addCallback( + future, new Helper.AppendCompleteCallback(), MoreExecutors.directExecutor()); } String table = @@ -811,7 +805,7 @@ public void testRangeTypeWrite() /* datasetId= */ DATASET, /* tableId= */ tableId.getTable()); ReadSession session = - client.createReadSession( + readClient.createReadSession( /* parent= */ parentProjectId, /* readSession= */ ReadSession.newBuilder() .setTable(table) @@ -849,7 +843,7 @@ public void testRangeTypeWrite() long rowCount = 0; // Process each block of rows as they arrive and decode using our simple row reader. - ServerStream stream = client.readRowsCallable().call(readRowsRequest); + ServerStream stream = readClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { Preconditions.checkState(response.hasArrowRecordBatch()); reader.processRows( @@ -869,7 +863,7 @@ public void timestamp_readArrow() throws IOException { String table = BigQueryResource.formatTableResource(projectName, DATASET, BQSTORAGE_TIMESTAMP_READ_TABLE); ReadSession session = - client.createReadSession( + readClient.createReadSession( parentProjectId, ReadSession.newBuilder().setTable(table).setDataFormat(DataFormat.ARROW).build(), MAX_STREAM_COUNT); @@ -896,16 +890,16 @@ public void timestamp_readArrow() throws IOException { long rowCount = 0; // Process each block of rows as they arrive and decode using our simple row reader. - ServerStream stream = client.readRowsCallable().call(readRowsRequest); + ServerStream stream = readClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { Preconditions.checkState(response.hasArrowRecordBatch()); reader.processRows( response.getArrowRecordBatch(), new SimpleRowReaderArrow.ArrowTimestampBatchConsumer( - Arrays.asList(EXPECTED_TIMESTAMPS_MICROS))); + Arrays.asList(Helper.INPUT_TIMESTAMPS_MICROS))); rowCount += response.getRowCount(); } - assertEquals(EXPECTED_TIMESTAMPS_MICROS.length, rowCount); + assertEquals(Helper.EXPECTED_TIMESTAMPS_MICROS.length, rowCount); } } @@ -913,11 +907,11 @@ public void timestamp_readArrow() throws IOException { public void timestamp_readAvro() throws IOException { String table = BigQueryResource.formatTableResource(projectName, DATASET, BQSTORAGE_TIMESTAMP_READ_TABLE); - List rows = readAllRows(table, null); + List rows = Helper.readAllRows(readClient, parentProjectId, table, null); List timestamps = rows.stream().map(x -> (Long) x.get("timestamp")).collect(Collectors.toList()); for (int i = 0; i < timestamps.size(); i++) { - assertEquals(EXPECTED_TIMESTAMPS_MICROS[i], timestamps.get(i)); + assertEquals(Helper.EXPECTED_TIMESTAMPS_MICROS[i], timestamps.get(i)); } } @@ -930,7 +924,7 @@ public void testSimpleReadAndResume() { /* tableId= */ "shakespeare"); ReadSession session = - client.createReadSession( + readClient.createReadSession( /* parent= */ parentProjectId, /* readSession= */ ReadSession.newBuilder() .setTable(table) @@ -955,7 +949,7 @@ public void testSimpleReadAndResume() { .setOffset(rowCount) .build(); - ServerStream stream = client.readRowsCallable().call(readRowsRequest); + ServerStream stream = readClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { rowCount += response.getRowCount(); @@ -989,7 +983,7 @@ public void testFilter() throws IOException { .build()) .build(); - ReadSession session = client.createReadSession(request); + ReadSession session = readClient.createReadSession(request); assertEquals( String.format( "Did not receive expected number of streams for table '%s' CreateReadSession" @@ -1006,7 +1000,7 @@ public void testFilter() throws IOException { long rowCount = 0; - ServerStream stream = client.readRowsCallable().call(readRowsRequest); + ServerStream stream = readClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { rowCount += response.getRowCount(); reader.processRows( @@ -1050,7 +1044,7 @@ public void testColumnSelection() throws IOException { .build()) .build(); - ReadSession session = client.createReadSession(request); + ReadSession session = readClient.createReadSession(request); assertEquals( String.format( "Did not receive expected number of streams for table '%s' CreateReadSession" @@ -1081,7 +1075,7 @@ public void testColumnSelection() throws IOException { SimpleRowReaderAvro reader = new SimpleRowReaderAvro(avroSchema); long rowCount = 0; - ServerStream stream = client.readRowsCallable().call(readRowsRequest); + ServerStream stream = readClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { rowCount += response.getRowCount(); reader.processRows( @@ -1099,7 +1093,7 @@ record -> { }); } - assertEquals(1_333, rowCount); + assertEquals(SHAKESPEARE_SAMPELS_ROWS_MORE_THAN_100_WORDS, rowCount); } @Test @@ -1130,7 +1124,9 @@ public void testReadAtSnapshot() throws InterruptedException, IOException { /* tableId= */ testTableId.getTable()); final List rowsAfterFirstSnapshot = new ArrayList<>(); - processRowsAtSnapshot( + Helper.processRowsAtSnapshot( + readClient, + parentProjectId, /* table= */ table, /* snapshotInMillis= */ firstJob.getStatistics().getEndTime(), /* filter= */ null, @@ -1143,7 +1139,9 @@ public void accept(GenericData.Record record) { assertEquals(Collections.singletonList(1L), rowsAfterFirstSnapshot); final List rowsAfterSecondSnapshot = new ArrayList<>(); - processRowsAtSnapshot( + Helper.processRowsAtSnapshot( + readClient, + parentProjectId, /* table= */ table, /* snapshotInMillis= */ secondJob.getStatistics().getEndTime(), /* filter= */ null, @@ -1183,11 +1181,16 @@ public void testColumnPartitionedTableByDateField() throws InterruptedException, /* datasetId= */ DATASET, /* tableId= */ partitionedTableName); - List unfilteredRows = readAllRows(/* table= */ table, /* filter= */ null); + List unfilteredRows = + Helper.readAllRows(readClient, parentProjectId, /* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + unfilteredRows.toString(), 3, unfilteredRows.size()); List partitionFilteredRows = - readAllRows(/* table= */ table, /* filter= */ "date_field = CAST(\"2019-01-02\" AS DATE)"); + Helper.readAllRows( + readClient, + parentProjectId, + /* table= */ table, + /* filter= */ "date_field = CAST(\"2019-01-02\" AS DATE)"); assertEquals( "Actual rows read: " + partitionFilteredRows.toString(), 1, partitionFilteredRows.size()); assertEquals(2L, partitionFilteredRows.get(0).get("num_field")); @@ -1231,11 +1234,16 @@ public void testIngestionTimePartitionedTable() throws InterruptedException, IOE /* datasetId= */ testTableId.getDataset(), /* tableId= */ testTableId.getTable()); - List unfilteredRows = readAllRows(/* table= */ table, /* filter= */ null); + List unfilteredRows = + Helper.readAllRows(readClient, parentProjectId, /* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + unfilteredRows.toString(), 2, unfilteredRows.size()); List partitionFilteredRows = - readAllRows(/* table= */ table, /* filter= */ "_PARTITIONDATE > \"2019-01-01\""); + Helper.readAllRows( + readClient, + parentProjectId, + /* table= */ table, + /* filter= */ "_PARTITIONDATE > \"2019-01-01\""); assertEquals( "Actual rows read: " + partitionFilteredRows.toString(), 1, partitionFilteredRows.size()); assertEquals(2L, partitionFilteredRows.get(0).get("num_field")); @@ -1272,7 +1280,8 @@ public void testBasicSqlTypes() throws InterruptedException, IOException { BigQueryResource.formatTableResource( /* projectId= */ projectName, /* datasetId= */ DATASET, /* tableId= */ tableName); - List rows = readAllRows(/* table= */ table, /* filter= */ null); + List rows = + Helper.readAllRows(readClient, parentProjectId, /* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -1369,7 +1378,8 @@ public void testDateAndTimeSqlTypes() throws InterruptedException, IOException { BigQueryResource.formatTableResource( /* projectId= */ projectName, /* datasetId= */ DATASET, /* tableId= */ tableName); - List rows = readAllRows(/* table= */ table, /* filter= */ null); + List rows = + Helper.readAllRows(readClient, parentProjectId, /* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -1464,7 +1474,8 @@ public void testGeographySqlType() throws InterruptedException, IOException { BigQueryResource.formatTableResource( /* projectId= */ projectName, /* datasetId= */ DATASET, /* tableId= */ tableName); - List rows = readAllRows(/* table= */ table, /* filter= */ null); + List rows = + Helper.readAllRows(readClient, parentProjectId, /* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -1507,7 +1518,8 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio BigQueryResource.formatTableResource( /* projectId= */ projectName, /* datasetId= */ DATASET, /* tableId= */ tableName); - List rows = readAllRows(/* table= */ table, /* filter= */ null); + List rows = + Helper.readAllRows(readClient, parentProjectId, /* table= */ table, /* filter= */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -1561,14 +1573,14 @@ public void testSimpleReadWithBackgroundExecutorProvider() throws IOException { InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(14).build()) .build(); // Overriding the default client - client = BigQueryReadClient.create(bigQueryReadSettings); + readClient = BigQueryReadClient.create(bigQueryReadSettings); assertTrue( - client.getStub().getStubSettings().getBackgroundExecutorProvider() + readClient.getStub().getStubSettings().getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider); assertEquals( 14, ((InstantiatingExecutorProvider) - client.getStub().getStubSettings().getBackgroundExecutorProvider()) + readClient.getStub().getStubSettings().getBackgroundExecutorProvider()) .getExecutorThreadCount()); String table = BigQueryResource.formatTableResource( @@ -1577,7 +1589,7 @@ public void testSimpleReadWithBackgroundExecutorProvider() throws IOException { /* tableId= */ "shakespeare"); ReadSession session = - client.createReadSession( + readClient.createReadSession( /* parent= */ parentProjectId, /* readSession= */ ReadSession.newBuilder() .setTable(table) @@ -1596,7 +1608,7 @@ public void testSimpleReadWithBackgroundExecutorProvider() throws IOException { ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); long rowCount = 0; - ServerStream stream = client.readRowsCallable().call(readRowsRequest); + ServerStream stream = readClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { rowCount += response.getRowCount(); } @@ -1609,7 +1621,8 @@ public void testUniverseDomainWithInvalidUniverseDomain() throws IOException { BigQueryReadSettings bigQueryReadSettings = BigQueryReadSettings.newBuilder() .setCredentialsProvider( - FixedCredentialsProvider.create(loadCredentials(FAKE_JSON_CRED_WITH_GOOGLE_DOMAIN))) + FixedCredentialsProvider.create( + Helper.loadCredentials(FAKE_JSON_CRED_WITH_GOOGLE_DOMAIN))) .setUniverseDomain("invalid.domain") .build(); BigQueryReadClient localClient = BigQueryReadClient.create(bigQueryReadSettings); @@ -1644,7 +1657,7 @@ public void testInvalidUniverseDomainWithMismatchCredentials() throws IOExceptio BigQueryReadSettings.newBuilder() .setCredentialsProvider( FixedCredentialsProvider.create( - loadCredentials(FAKE_JSON_CRED_WITH_INVALID_DOMAIN))) + Helper.loadCredentials(FAKE_JSON_CRED_WITH_INVALID_DOMAIN))) .setUniverseDomain("invalid.domain") .build(); BigQueryReadClient localClient = BigQueryReadClient.create(bigQueryReadSettings); @@ -1699,7 +1712,7 @@ public void testUniverseDomainWithMatchingDomain() throws IOException { ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); long rowCount = 0; - ServerStream stream = client.readRowsCallable().call(readRowsRequest); + ServerStream stream = readClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { rowCount += response.getRowCount(); } @@ -1824,7 +1837,8 @@ private long readStreamToOffset(ReadStream readStream, long rowOffset) { ReadRowsRequest.newBuilder().setReadStream(readStream.getName()).build(); long rowCount = 0; - ServerStream serverStream = client.readRowsCallable().call(readRowsRequest); + ServerStream serverStream = + readClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : serverStream) { rowCount += response.getRowCount(); @@ -1836,90 +1850,6 @@ private long readStreamToOffset(ReadStream readStream, long rowOffset) { return rowCount; } - /** - * Reads all the rows from the specified table. - * - *

For every row, the consumer is called for processing. - * - * @param table - * @param snapshotInMillis Optional. If specified, all rows up to timestamp will be returned. - * @param filter Optional. If specified, it will be used to restrict returned data. - * @param consumer that receives all Avro rows. - * @throws IOException - */ - private void processRowsAtSnapshot( - String table, Long snapshotInMillis, String filter, AvroRowConsumer consumer) - throws IOException { - Preconditions.checkNotNull(table); - Preconditions.checkNotNull(consumer); - - CreateReadSessionRequest.Builder createSessionRequestBuilder = - CreateReadSessionRequest.newBuilder() - .setParent(parentProjectId) - .setMaxStreamCount(1) - .setReadSession( - ReadSession.newBuilder().setTable(table).setDataFormat(DataFormat.AVRO).build()); - - if (snapshotInMillis != null) { - Timestamp snapshotTimestamp = - Timestamp.newBuilder() - .setSeconds(snapshotInMillis / 1_000) - .setNanos((int) ((snapshotInMillis % 1000) * 1000000)) - .build(); - createSessionRequestBuilder - .getReadSessionBuilder() - .setTableModifiers( - TableModifiers.newBuilder().setSnapshotTime(snapshotTimestamp).build()); - } - - if (filter != null && !filter.isEmpty()) { - createSessionRequestBuilder - .getReadSessionBuilder() - .setReadOptions(TableReadOptions.newBuilder().setRowRestriction(filter).build()); - } - - ReadSession session = client.createReadSession(createSessionRequestBuilder.build()); - assertEquals( - String.format( - "Did not receive expected number of streams for table '%s' CreateReadSession" - + " response:%n%s", - table, session.toString()), - 1, - session.getStreamsCount()); - - ReadRowsRequest readRowsRequest = - ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); - - SimpleRowReaderAvro reader = - new SimpleRowReaderAvro(new Schema.Parser().parse(session.getAvroSchema().getSchema())); - - ServerStream stream = client.readRowsCallable().call(readRowsRequest); - for (ReadRowsResponse response : stream) { - reader.processRows(response.getAvroRows(), consumer); - } - } - - /** - * Reads all the rows from the specified table and returns a list as generic Avro records. - * - * @param table - * @param filter Optional. If specified, it will be used to restrict returned data. - * @return - */ - List readAllRows(String table, String filter) throws IOException { - final List rows = new ArrayList<>(); - processRowsAtSnapshot( - /* table= */ table, - /* snapshotInMillis= */ null, - /* filter= */ filter, - (AvroRowConsumer) - record -> { - // clone the record since that reference will be reused by the reader. - rows.add(new GenericRecordBuilder(record).build()); - }); - return rows; - } - /** * Runs a query job with WRITE_APPEND disposition to the destination table and returns the * successfully completed job. @@ -1963,33 +1893,4 @@ private Job runQueryJobAndExpectSuccess(QueryJobConfiguration configuration) return completedJob; } - - static ServiceAccountCredentials loadCredentials(String credentialFile) { - try (InputStream keyStream = new ByteArrayInputStream(credentialFile.getBytes())) { - return ServiceAccountCredentials.fromStream(keyStream); - } catch (IOException e) { - fail("Couldn't create fake JSON credentials."); - } - return null; - } - - static class AppendCompleteCallback implements ApiFutureCallback { - private static final Object lock = new Object(); - private static int batchCount = 0; - - public void onSuccess(AppendRowsResponse response) { - synchronized (lock) { - if (response.hasError()) { - System.out.format("Error: %s\n", response.getError()); - } else { - ++batchCount; - System.out.format("Wrote batch %d\n", batchCount); - } - } - } - - public void onFailure(Throwable throwable) { - System.out.format("Error: %s\n", throwable.toString()); - } - } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteClientTest.java index 7af2eece1e..5e1b1068fc 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteClientTest.java @@ -41,6 +41,8 @@ import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange; import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException; import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException; +import com.google.cloud.bigquery.storage.v1.it.util.BigQueryResource; +import com.google.cloud.bigquery.storage.v1.it.util.Helper; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; @@ -65,6 +67,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; +import java.util.stream.Collectors; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.*; @@ -75,6 +78,7 @@ import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.avro.generic.GenericData; import org.json.JSONArray; import org.json.JSONObject; import org.junit.AfterClass; @@ -94,7 +98,10 @@ public class ITBigQueryWriteClientTest { private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; - private static BigQueryWriteClient client; + private static BigQueryReadClient readClient; + private static BigQueryWriteClient writeClient; + private static String projectName; + private static String parentProjectId; private static TableInfo tableInfo; private static TableInfo tableInfo2; @@ -126,9 +133,13 @@ public StringWithSecondsNanos(String fooParam, long secondsParam, int nanosParam @BeforeClass public static void beforeClass() throws IOException { + readClient = BigQueryReadClient.create(); + BigQueryWriteSettings settings = BigQueryWriteSettings.newBuilder().setHeaderProvider(USER_AGENT_HEADER_PROVIDER).build(); - client = BigQueryWriteClient.create(settings); + writeClient = BigQueryWriteClient.create(settings); + projectName = ServiceOptions.getDefaultProjectId(); + parentProjectId = String.format("projects/%s", projectName); RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); bigquery = bigqueryHelper.getOptions().getService(); @@ -217,9 +228,14 @@ public static void beforeClass() throws IOException { @AfterClass public static void afterClass() throws InterruptedException { - if (client != null) { - client.close(); - client.awaitTermination(10, TimeUnit.SECONDS); + if (writeClient != null) { + writeClient.close(); + writeClient.awaitTermination(10, TimeUnit.SECONDS); + } + + if (readClient != null) { + readClient.close(); + readClient.awaitTermination(10, TimeUnit.SECONDS); } if (bigquery != null) { @@ -303,7 +319,7 @@ ProtoRows createProtoRowsMixed(StringWithSecondsNanos[] messages) { public void testBatchWriteWithCommittedStreamEU() throws IOException, InterruptedException, ExecutionException { WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(tableIdEU) .setWriteStream( @@ -333,7 +349,7 @@ public void testBatchWriteWithCommittedStreamEU() public void testProto3OptionalBatchWriteWithCommittedStream() throws IOException, InterruptedException, ExecutionException { WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(tableId) .setWriteStream( @@ -385,7 +401,7 @@ public void testJsonStreamWriterCommittedStream() bigquery.create(tableInfo); TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(parent.toString()) .setWriteStream( @@ -555,7 +571,7 @@ public void testRequestProfilerWithCommittedStream() TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(parent.toString()) .setWriteStream( @@ -635,7 +651,7 @@ public void testJsonStreamWriterWithDefaultSchema() // Create JsonStreamWriter with newBuilder(streamOrTable, client) try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(parent.toString(), client) + JsonStreamWriter.newBuilder(parent.toString(), writeClient) .setIgnoreUnknownFields(true) .build()) { LOG.info("Sending one message"); @@ -729,7 +745,7 @@ public void testJsonStreamWriterWithDefaultSchemaNoTable() { // Create JsonStreamWriter with newBuilder(streamOrTable, client) try (JsonStreamWriter ignore = - JsonStreamWriter.newBuilder(parent.toString(), client) + JsonStreamWriter.newBuilder(parent.toString(), writeClient) .setIgnoreUnknownFields(true) .build()) { // Do nothing @@ -920,7 +936,7 @@ public void testJsonDefaultStreamOnTableWithDefaultValue_SchemaNotGiven() TableInfo.newBuilder(TableId.of(DATASET, tableName), defaultValueTableDefinition).build(); bigquery.create(tableInfo); try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(defaultTableId, client) + JsonStreamWriter.newBuilder(defaultTableId, writeClient) .setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE) .build()) { testJsonStreamWriterForDefaultValue(jsonStreamWriter); @@ -943,7 +959,7 @@ public void testJsonExclusiveStreamOnTableWithDefaultValue_GiveTableSchema() TableInfo.newBuilder(TableId.of(DATASET, tableName), defaultValueTableDefinition).build(); bigquery.create(tableInfo); WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(exclusiveTableId) .setWriteStream( @@ -1180,7 +1196,7 @@ private void testArrowIngestion(boolean serializedInput) } if (serializedInput) { try (StreamWriter streamWriter = - StreamWriter.newBuilder(tableId + "/_default", client) + StreamWriter.newBuilder(tableId + "/_default", writeClient) .setWriterSchema(v1ArrowSchema) .setTraceId(TEST_TRACE_ID) .setMaxRetryDuration(java.time.Duration.ofSeconds(5)) @@ -1197,7 +1213,7 @@ private void testArrowIngestion(boolean serializedInput) } } else { try (StreamWriter streamWriter = - StreamWriter.newBuilder(tableId + "/_default", client) + StreamWriter.newBuilder(tableId + "/_default", writeClient) .setWriterSchema(arrowSchema) .setTraceId(TEST_TRACE_ID) .setMaxRetryDuration(java.time.Duration.ofSeconds(5)) @@ -1248,7 +1264,7 @@ public void testJsonStreamWriterWithMessagesOver10M() TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(parent.toString()) .setWriteStream( @@ -1296,14 +1312,14 @@ public void testJsonStreamWriterSchemaUpdate() bigquery.create(tableInfo); TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(parent.toString()) .setWriteStream( WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) .build()); try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) { + JsonStreamWriter.newBuilder(writeStream.getName(), writeClient).build()) { // write the 1st row JSONObject foo = new JSONObject(); foo.put("col1", "aaa"); @@ -1381,7 +1397,7 @@ public void testJsonStreamWriterSchemaUpdateConcurrent() bigquery.create(tableInfo); TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(parent.toString()) .setWriteStream( @@ -1413,7 +1429,7 @@ public void testJsonStreamWriterSchemaUpdateConcurrent() // Start writing using the JsonWriter try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) { + JsonStreamWriter.newBuilder(writeStream.getName(), writeClient).build()) { int numberOfThreads = 5; CountDownLatch latch; AtomicInteger next; @@ -1505,7 +1521,7 @@ public void testJsonStreamWriterSchemaUpdateWithMissingValueInterpretationMap() bigquery.create(tableInfo); TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(parent.toString()) .setWriteStream( @@ -1518,7 +1534,7 @@ public void testJsonStreamWriterSchemaUpdateWithMissingValueInterpretationMap() "date_with_default_to_current", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), client) + JsonStreamWriter.newBuilder(writeStream.getName(), writeClient) .setMissingValueInterpretationMap(missingValueMap) .build()) { // Verify the missing value map @@ -1644,7 +1660,7 @@ public void testJsonStreamWriterWithFlexibleColumnName() bigquery.create(tableInfo); TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(parent.toString()) .setWriteStream( @@ -1737,7 +1753,7 @@ public void testJsonStreamWriterWithNestedFlexibleColumnName() bigquery.create(tableInfo); TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(parent.toString()) .setWriteStream( @@ -1814,14 +1830,14 @@ public void testJsonStreamWriterSchemaUpdateWithFlexibleColumnName() bigquery.create(tableInfo); TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(parent.toString()) .setWriteStream( WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) .build()); try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) { + JsonStreamWriter.newBuilder(writeStream.getName(), writeClient).build()) { // write the 1st row JSONObject foo = new JSONObject(); foo.put("col1-列", "aaa"); @@ -1891,7 +1907,7 @@ public void testComplicateSchemaWithPendingStream() throws IOException, InterruptedException, ExecutionException { LOG.info("Create a write stream"); WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(tableId2) .setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build()) @@ -1919,7 +1935,7 @@ public void testComplicateSchemaWithPendingStream() LOG.info("Finalize a write stream"); finalizeResponse = - client.finalizeWriteStream( + writeClient.finalizeWriteStream( FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build()); ApiFuture response3 = @@ -1934,7 +1950,7 @@ public void testComplicateSchemaWithPendingStream() assertEquals(2, finalizeResponse.getRowCount()); LOG.info("Commit a write stream"); BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse = - client.batchCommitWriteStreams( + writeClient.batchCommitWriteStreams( BatchCommitWriteStreamsRequest.newBuilder() .setParent(tableId2) .addWriteStreams(writeStream.getName()) @@ -1961,7 +1977,7 @@ public void testComplicateSchemaWithPendingStream() @Test public void testStreamError() throws IOException, InterruptedException, ExecutionException { WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(tableId) .setWriteStream( @@ -1995,7 +2011,7 @@ public void testStreamError() throws IOException, InterruptedException, Executio @Test public void testStreamSchemaMisMatchError() throws IOException, InterruptedException { WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(tableId) .setWriteStream( @@ -2027,7 +2043,7 @@ public void testStreamSchemaMisMatchError() throws IOException, InterruptedExcep public void testStreamFinalizedError() throws IOException, InterruptedException, ExecutionException { WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(tableId) .setWriteStream( @@ -2042,7 +2058,7 @@ public void testStreamFinalizedError() streamWriter.append(createProtoRowsMultipleColumns(new String[] {"a"}), /* offset= */ 0); response.get(); // Finalize the stream in order to trigger STREAM_FINALIZED error - client.finalizeWriteStream( + writeClient.finalizeWriteStream( FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build()); // Try to append to a finalized stream ApiFuture response2 = @@ -2065,7 +2081,7 @@ public void testStreamFinalizedError() public void testOffsetAlreadyExistsError() throws IOException, ExecutionException, InterruptedException { WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(tableId) .setWriteStream( @@ -2101,7 +2117,7 @@ public void testOffsetAlreadyExistsError() @Test public void testOffsetOutOfRangeError() throws IOException, InterruptedException { WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(tableId) .setWriteStream( @@ -2133,7 +2149,7 @@ public void testOffsetOutOfRangeError() throws IOException, InterruptedException @Test public void testStreamReconnect() throws IOException, InterruptedException, ExecutionException { WriteStream writeStream = - client.createWriteStream( + writeClient.createWriteStream( CreateWriteStreamRequest.newBuilder() .setParent(tableId) .setWriteStream( @@ -2256,4 +2272,114 @@ public void testLargeRequest() throws IOException, InterruptedException, Executi assertEquals("50", queryIter.next().get(0).getStringValue()); } } + + @Test + public void timestamp_arrowWrite() throws IOException { + String timestampFieldName = "timestamp"; + com.google.cloud.bigquery.Schema tableSchema = + com.google.cloud.bigquery.Schema.of( + Field.newBuilder(timestampFieldName, StandardSQLTypeName.TIMESTAMP) + .setMode(Mode.REQUIRED) + .build()); + + String tableName = "bqstorage_timestamp_write_arrow"; + TableId testTableId = TableId.of(DATASET, tableName); + bigquery.create( + TableInfo.of( + testTableId, StandardTableDefinition.newBuilder().setSchema(tableSchema).build())); + + List fields = + ImmutableList.of( + new org.apache.arrow.vector.types.pojo.Field( + timestampFieldName, + FieldType.nullable( + new ArrowType.Timestamp( + org.apache.arrow.vector.types.TimeUnit.MICROSECOND, "UTC")), + null)); + org.apache.arrow.vector.types.pojo.Schema schema = + new org.apache.arrow.vector.types.pojo.Schema(fields, null); + + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + try (StreamWriter streamWriter = + StreamWriter.newBuilder(parent.toString() + "/_default").setWriterSchema(schema).build()) { + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + TimeStampMicroTZVector timestampVector = + (TimeStampMicroTZVector) root.getVector(timestampFieldName); + timestampVector.allocateNew(Helper.INPUT_TIMESTAMPS_MICROS.length); + + for (int i = 0; i < Helper.INPUT_TIMESTAMPS_MICROS.length; i++) { + timestampVector.set(i, Helper.INPUT_TIMESTAMPS_MICROS[i]); + } + root.setRowCount(Helper.INPUT_TIMESTAMPS_MICROS.length); + + CompressionCodec codec = + NoCompressionCodec.Factory.INSTANCE.createCodec( + CompressionUtil.CodecType.NO_COMPRESSION); + VectorUnloader vectorUnloader = + new VectorUnloader(root, /* includeNullCount= */ true, codec, /* alignBuffers= */ true); + org.apache.arrow.vector.ipc.message.ArrowRecordBatch batch = + vectorUnloader.getRecordBatch(); + // Asynchronous append. + streamWriter.append(batch, -1); + } + } + String table = + BigQueryResource.formatTableResource( + ServiceOptions.getDefaultProjectId(), DATASET, tableName); + List rows = Helper.readAllRows(readClient, parentProjectId, table, null); + List timestamps = + rows.stream().map(x -> (Long) x.get(timestampFieldName)).collect(Collectors.toList()); + assertEquals(timestamps.size(), Helper.EXPECTED_TIMESTAMPS_MICROS.length); + for (int i = 0; i < timestamps.size(); i++) { + assertEquals(timestamps.get(i), Helper.EXPECTED_TIMESTAMPS_MICROS[i]); + } + } + + @Test + public void timestamp_protobufWrite() + throws IOException, DescriptorValidationException, InterruptedException { + String timestampFieldName = "timestamp"; + com.google.cloud.bigquery.Schema bqTableSchema = + com.google.cloud.bigquery.Schema.of( + Field.newBuilder(timestampFieldName, StandardSQLTypeName.TIMESTAMP) + .setMode(Mode.REQUIRED) + .build()); + + String tableName = "bqstorage_timestamp_write_avro"; + TableId testTableId = TableId.of(DATASET, tableName); + bigquery.create( + TableInfo.of( + testTableId, StandardTableDefinition.newBuilder().setSchema(bqTableSchema).build())); + + TableFieldSchema TEST_TIMESTAMP = + TableFieldSchema.newBuilder() + .setName(timestampFieldName) + .setType(TableFieldSchema.Type.TIMESTAMP) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build(); + TableSchema tableSchema = TableSchema.newBuilder().addFields(TEST_TIMESTAMP).build(); + + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(parent.toString(), tableSchema).build()) { + for (long timestampMicro : Helper.INPUT_TIMESTAMPS_MICROS) { + JSONArray jsonArray = new JSONArray(); + JSONObject row = new JSONObject(); + row.put(timestampFieldName, timestampMicro); + jsonArray.put(row); + jsonStreamWriter.append(jsonArray); + } + } + + String table = + BigQueryResource.formatTableResource( + ServiceOptions.getDefaultProjectId(), DATASET, tableName); + List rows = Helper.readAllRows(readClient, parentProjectId, table, null); + List timestamps = + rows.stream().map(x -> (Long) x.get(timestampFieldName)).collect(Collectors.toList()); + assertEquals(timestamps.size(), Helper.EXPECTED_TIMESTAMPS_MICROS.length); + for (int i = 0; i < timestamps.size(); i++) { + assertEquals(timestamps.get(i), Helper.EXPECTED_TIMESTAMPS_MICROS[i]); + } + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java index b2200ab086..8772a95632 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -38,6 +38,7 @@ import com.google.cloud.bigquery.storage.v1.StreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.storage.v1.it.util.WriteRetryTestUtil; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.protobuf.DescriptorProtos.DescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java index ec835d944b..1573309606 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java @@ -26,6 +26,7 @@ import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.storage.v1.it.util.WriteRetryTestUtil; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.protobuf.Descriptors.DescriptorValidationException; import java.io.IOException; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/BigQueryResource.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/BigQueryResource.java similarity index 95% rename from google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/BigQueryResource.java rename to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/BigQueryResource.java index 6a33b50f0f..04daffb348 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/BigQueryResource.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/BigQueryResource.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.bigquery.storage.v1.it; +package com.google.cloud.bigquery.storage.v1.it.util; /** Test helper class to generate BigQuery resource paths. */ public class BigQueryResource { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java new file mode 100644 index 0000000000..8d91499fba --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java @@ -0,0 +1,167 @@ +package com.google.cloud.bigquery.storage.v1.it.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.api.core.ApiFutureCallback; +import com.google.api.gax.rpc.ServerStream; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.DataFormat; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.common.base.Preconditions; +import com.google.protobuf.Timestamp; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; + +public class Helper { + + public static final Long[] INPUT_TIMESTAMPS_MICROS = + new Long[] { + 1735734896123456L, // 2025-01-01T12:34:56.123456Z + 1580646896123456L, // 2020-02-02T12:34:56.123456Z + 636467696123456L, // 1990-03-03T12:34:56.123456Z + 165846896123456L // 1975-04-04T12:34:56.123456Z + }; + + public static final Long[] EXPECTED_TIMESTAMPS_MICROS = + new Long[] { + 1735734896123456L, // 2025-01-01T12:34:56.123456Z + 1580646896123456L, // 2020-02-02T12:34:56.123456Z + 636467696123456L, // 1990-03-03T12:34:56.123456Z + 165846896123456L // 1975-04-04T12:34:56.123456Z + }; + + public static ServiceAccountCredentials loadCredentials(String credentialFile) { + try (InputStream keyStream = new ByteArrayInputStream(credentialFile.getBytes())) { + return ServiceAccountCredentials.fromStream(keyStream); + } catch (IOException e) { + fail("Couldn't create fake JSON credentials."); + } + return null; + } + + public static class AppendCompleteCallback implements ApiFutureCallback { + private static final Object lock = new Object(); + private static int batchCount = 0; + + public void onSuccess(AppendRowsResponse response) { + synchronized (lock) { + if (response.hasError()) { + System.out.format("Error: %s\n", response.getError()); + } else { + ++batchCount; + System.out.format("Wrote batch %d\n", batchCount); + } + } + } + + public void onFailure(Throwable throwable) { + System.out.format("Error: %s\n", throwable.toString()); + } + } + + /** + * Reads all the rows from the specified table. + * + *

For every row, the consumer is called for processing. + * + * @param table + * @param snapshotInMillis Optional. If specified, all rows up to timestamp will be returned. + * @param filter Optional. If specified, it will be used to restrict returned data. + * @param consumer that receives all Avro rows. + * @throws IOException + */ + public static void processRowsAtSnapshot( + BigQueryReadClient client, + String parentProjectId, + String table, + Long snapshotInMillis, + String filter, + SimpleRowReaderAvro.AvroRowConsumer consumer) + throws IOException { + Preconditions.checkNotNull(table); + Preconditions.checkNotNull(consumer); + + CreateReadSessionRequest.Builder createSessionRequestBuilder = + CreateReadSessionRequest.newBuilder() + .setParent(parentProjectId) + .setMaxStreamCount(1) + .setReadSession( + ReadSession.newBuilder().setTable(table).setDataFormat(DataFormat.AVRO).build()); + + if (snapshotInMillis != null) { + Timestamp snapshotTimestamp = + Timestamp.newBuilder() + .setSeconds(snapshotInMillis / 1_000) + .setNanos((int) ((snapshotInMillis % 1000) * 1000000)) + .build(); + createSessionRequestBuilder + .getReadSessionBuilder() + .setTableModifiers( + ReadSession.TableModifiers.newBuilder().setSnapshotTime(snapshotTimestamp).build()); + } + + if (filter != null && !filter.isEmpty()) { + createSessionRequestBuilder + .getReadSessionBuilder() + .setReadOptions( + ReadSession.TableReadOptions.newBuilder().setRowRestriction(filter).build()); + } + + ReadSession session = client.createReadSession(createSessionRequestBuilder.build()); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession" + + " response:%n%s", + table, session.toString()), + 1, + session.getStreamsCount()); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); + + SimpleRowReaderAvro reader = + new SimpleRowReaderAvro(new Schema.Parser().parse(session.getAvroSchema().getSchema())); + + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + reader.processRows(response.getAvroRows(), consumer); + } + } + + /** + * Reads all the rows from the specified table and returns a list as generic Avro records. + * + * @param table + * @param filter Optional. If specified, it will be used to restrict returned data. + * @return + */ + public static List readAllRows( + BigQueryReadClient client, String parentProjectId, String table, String filter) + throws IOException { + final List rows = new ArrayList<>(); + processRowsAtSnapshot( + client, + parentProjectId, + /* table= */ table, + /* snapshotInMillis= */ null, + /* filter= */ filter, + (SimpleRowReaderAvro.AvroRowConsumer) + record -> { + // clone the record since that reference will be reused by the reader. + rows.add(new GenericRecordBuilder(record).build()); + }); + return rows; + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderArrow.java similarity index 97% rename from google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java rename to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderArrow.java index 7ccdf4526d..e4afb9b1ab 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderArrow.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.bigquery.storage.v1.it; +package com.google.cloud.bigquery.storage.v1.it.util; import static com.google.common.truth.Truth.assertThat; @@ -49,10 +49,10 @@ public interface ArrowBatchConsumer { void accept(VectorSchemaRoot root); } - static class ArrowTimestampBatchConsumer implements ArrowBatchConsumer { + public static class ArrowTimestampBatchConsumer implements ArrowBatchConsumer { private final List expectedTimestampValues; - ArrowTimestampBatchConsumer(List expectedTimestampValues) { + public ArrowTimestampBatchConsumer(List expectedTimestampValues) { this.expectedTimestampValues = expectedTimestampValues; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderAvro.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderAvro.java similarity index 97% rename from google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderAvro.java rename to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderAvro.java index a23179c8c8..4914e93f5b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderAvro.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderAvro.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.bigquery.storage.v1.it; +package com.google.cloud.bigquery.storage.v1.it.util; import com.google.cloud.bigquery.storage.v1.AvroRows; import com.google.common.base.Preconditions; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/WriteRetryTestUtil.java similarity index 99% rename from google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java rename to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/WriteRetryTestUtil.java index e11e0707df..872fe2dbb8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/WriteRetryTestUtil.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.bigquery.storage.v1.it; +package com.google.cloud.bigquery.storage.v1.it.util; import static org.junit.Assert.assertFalse; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java index 7fb96b5596..ad8e66d956 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java @@ -244,7 +244,7 @@ public void testSimpleRead() { @Test public void testSimpleReadArrow() { String table = - com.google.cloud.bigquery.storage.v1.it.BigQueryResource.formatTableResource( + com.google.cloud.bigquery.storage.v1.it.util.BigQueryResource.formatTableResource( /* projectId= */ "bigquery-public-data", /* datasetId= */ "samples", /* tableId= */ "shakespeare"); @@ -308,7 +308,7 @@ public void testRangeType() throws InterruptedException { bigquery.query(createTable); String table = - com.google.cloud.bigquery.storage.v1.it.BigQueryResource.formatTableResource( + com.google.cloud.bigquery.storage.v1.it.util.BigQueryResource.formatTableResource( /* projectId= */ ServiceOptions.getDefaultProjectId(), /* datasetId= */ DATASET, /* tableId= */ tableId.getTable()); From ed8b4b9eda4382b09dd3162a00444a5d1fd87419 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 16 Dec 2025 00:06:07 -0500 Subject: [PATCH 4/8] chore: Rename to ITBigQueryStorageWriteClientTest --- .../storage/v1/it/ITBigQueryStorageLongRunningTest.java | 3 +-- ...ntTest.java => ITBigQueryStorageWriteClientTest.java} | 9 ++++----- 2 files changed, 5 insertions(+), 7 deletions(-) rename google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/{ITBigQueryWriteClientTest.java => ITBigQueryStorageWriteClientTest.java} (99%) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java index 1fd39ec5e0..3db3bdc6f9 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java @@ -26,6 +26,7 @@ import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.cloud.bigquery.storage.v1.ReadStream; +import com.google.cloud.bigquery.storage.v1.it.util.BigQueryResource; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -36,8 +37,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; - -import com.google.cloud.bigquery.storage.v1.it.util.BigQueryResource; import org.junit.AfterClass; import org.junit.Assume; import org.junit.BeforeClass; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java similarity index 99% rename from google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteClientTest.java rename to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java index 5e1b1068fc..ccb979fe28 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java @@ -87,8 +87,9 @@ import org.junit.Test; /** Integration tests for BigQuery Write API. */ -public class ITBigQueryWriteClientTest { - private static final Logger LOG = Logger.getLogger(ITBigQueryWriteClientTest.class.getName()); +public class ITBigQueryStorageWriteClientTest { + private static final Logger LOG = + Logger.getLogger(ITBigQueryStorageWriteClientTest.class.getName()); private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; @@ -100,7 +101,6 @@ public class ITBigQueryWriteClientTest { private static BigQueryReadClient readClient; private static BigQueryWriteClient writeClient; - private static String projectName; private static String parentProjectId; private static TableInfo tableInfo; private static TableInfo tableInfo2; @@ -138,8 +138,7 @@ public static void beforeClass() throws IOException { BigQueryWriteSettings settings = BigQueryWriteSettings.newBuilder().setHeaderProvider(USER_AGENT_HEADER_PROVIDER).build(); writeClient = BigQueryWriteClient.create(settings); - projectName = ServiceOptions.getDefaultProjectId(); - parentProjectId = String.format("projects/%s", projectName); + parentProjectId = String.format("projects/%s", ServiceOptions.getDefaultProjectId()); RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); bigquery = bigqueryHelper.getOptions().getService(); From aca72f0bc4c810a18fb11f5c07d2858cb6f14c08 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 16 Dec 2025 00:20:38 -0500 Subject: [PATCH 5/8] chore: Add header for Helper util class --- .../bigquery/storage/v1/it/util/Helper.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java index 8d91499fba..d8b7d9211a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java @@ -1,3 +1,19 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.google.cloud.bigquery.storage.v1.it.util; import static org.junit.Assert.assertEquals; From 7b16577b30c4f9de142b5117c625994e22de2cb4 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 16 Dec 2025 15:25:06 -0500 Subject: [PATCH 6/8] chore: Address PR comments --- .../cloud/bigquery/storage/v1/it/util/Helper.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java index d8b7d9211a..a73d24ab1b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java @@ -30,7 +30,7 @@ import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.common.base.Preconditions; -import com.google.protobuf.Timestamp; +import com.google.protobuf.util.Timestamps; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -68,8 +68,8 @@ public static ServiceAccountCredentials loadCredentials(String credentialFile) { } public static class AppendCompleteCallback implements ApiFutureCallback { - private static final Object lock = new Object(); - private static int batchCount = 0; + private final Object lock = new Object(); + private int batchCount = 0; public void onSuccess(AppendRowsResponse response) { synchronized (lock) { @@ -117,15 +117,12 @@ public static void processRowsAtSnapshot( ReadSession.newBuilder().setTable(table).setDataFormat(DataFormat.AVRO).build()); if (snapshotInMillis != null) { - Timestamp snapshotTimestamp = - Timestamp.newBuilder() - .setSeconds(snapshotInMillis / 1_000) - .setNanos((int) ((snapshotInMillis % 1000) * 1000000)) - .build(); createSessionRequestBuilder .getReadSessionBuilder() .setTableModifiers( - ReadSession.TableModifiers.newBuilder().setSnapshotTime(snapshotTimestamp).build()); + ReadSession.TableModifiers.newBuilder() + .setSnapshotTime(Timestamps.fromMicros(snapshotInMillis)) + .build()); } if (filter != null && !filter.isEmpty()) { From 01ce666bc074b84326cff135ef323f1144625889 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 16 Dec 2025 15:46:18 -0500 Subject: [PATCH 7/8] chore: Add protobuf-java-util testing scope --- google-cloud-bigquerystorage/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 581129ad59..ca084e5773 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -232,6 +232,11 @@ arrow-memory-core test + + com.google.protobuf + protobuf-java-util + test + io.grpc From e83c489877929085f258d509071911900c2a1583 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 16 Dec 2025 17:05:40 -0500 Subject: [PATCH 8/8] chore: Use Timestamps.fromMillis --- .../com/google/cloud/bigquery/storage/v1/it/util/Helper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java index a73d24ab1b..26883f59ff 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java @@ -121,7 +121,7 @@ public static void processRowsAtSnapshot( .getReadSessionBuilder() .setTableModifiers( ReadSession.TableModifiers.newBuilder() - .setSnapshotTime(Timestamps.fromMicros(snapshotInMillis)) + .setSnapshotTime(Timestamps.fromMillis(snapshotInMillis)) .build()); }