serverStream =
+ readClient.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : serverStream) {
rowCount += response.getRowCount();
@@ -1717,92 +1850,6 @@ private long ReadStreamToOffset(ReadStream readStream, long rowOffset) {
return rowCount;
}
- /**
- * Reads all the rows from the specified table.
- *
- * For every row, the consumer is called for processing.
- *
- * @param table
- * @param snapshotInMillis Optional. If specified, all rows up to timestamp will be returned.
- * @param filter Optional. If specified, it will be used to restrict returned data.
- * @param consumer that receives all Avro rows.
- * @throws IOException
- */
- private void ProcessRowsAtSnapshot(
- String table, Long snapshotInMillis, String filter, AvroRowConsumer consumer)
- throws IOException {
- Preconditions.checkNotNull(table);
- Preconditions.checkNotNull(consumer);
-
- CreateReadSessionRequest.Builder createSessionRequestBuilder =
- CreateReadSessionRequest.newBuilder()
- .setParent(parentProjectId)
- .setMaxStreamCount(1)
- .setReadSession(
- ReadSession.newBuilder().setTable(table).setDataFormat(DataFormat.AVRO).build());
-
- if (snapshotInMillis != null) {
- Timestamp snapshotTimestamp =
- Timestamp.newBuilder()
- .setSeconds(snapshotInMillis / 1_000)
- .setNanos((int) ((snapshotInMillis % 1000) * 1000000))
- .build();
- createSessionRequestBuilder
- .getReadSessionBuilder()
- .setTableModifiers(
- TableModifiers.newBuilder().setSnapshotTime(snapshotTimestamp).build());
- }
-
- if (filter != null && !filter.isEmpty()) {
- createSessionRequestBuilder
- .getReadSessionBuilder()
- .setReadOptions(TableReadOptions.newBuilder().setRowRestriction(filter).build());
- }
-
- ReadSession session = client.createReadSession(createSessionRequestBuilder.build());
- assertEquals(
- String.format(
- "Did not receive expected number of streams for table '%s' CreateReadSession"
- + " response:%n%s",
- table, session.toString()),
- 1,
- session.getStreamsCount());
-
- ReadRowsRequest readRowsRequest =
- ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build();
-
- SimpleRowReaderAvro reader =
- new SimpleRowReaderAvro(new Schema.Parser().parse(session.getAvroSchema().getSchema()));
-
- ServerStream stream = client.readRowsCallable().call(readRowsRequest);
- for (ReadRowsResponse response : stream) {
- reader.processRows(response.getAvroRows(), consumer);
- }
- }
-
- /**
- * Reads all the rows from the specified table and returns a list as generic Avro records.
- *
- * @param table
- * @param filter Optional. If specified, it will be used to restrict returned data.
- * @return
- */
- List ReadAllRows(String table, String filter) throws IOException {
- final List rows = new ArrayList<>();
- ProcessRowsAtSnapshot(
- /* table= */ table,
- /* snapshotInMillis= */ null,
- /* filter= */ filter,
- new AvroRowConsumer() {
- @Override
- public void accept(GenericData.Record record) {
- // clone the record since that reference will be reused by the reader.
- rows.add(new GenericRecordBuilder(record).build());
- }
- });
- return rows;
- }
-
/**
* Runs a query job with WRITE_APPEND disposition to the destination table and returns the
* successfully completed job.
@@ -1812,9 +1859,9 @@ public void accept(GenericData.Record record) {
* @return
* @throws InterruptedException
*/
- private Job RunQueryAppendJobAndExpectSuccess(TableId destinationTableId, String query)
+ private Job runQueryAppendJobAndExpectSuccess(TableId destinationTableId, String query)
throws InterruptedException {
- return RunQueryJobAndExpectSuccess(
+ return runQueryJobAndExpectSuccess(
QueryJobConfiguration.newBuilder(query)
.setDestinationTable(destinationTableId)
.setUseQueryCache(false)
@@ -1830,7 +1877,7 @@ private Job RunQueryAppendJobAndExpectSuccess(TableId destinationTableId, String
* @return
* @throws InterruptedException
*/
- private Job RunQueryJobAndExpectSuccess(QueryJobConfiguration configuration)
+ private Job runQueryJobAndExpectSuccess(QueryJobConfiguration configuration)
throws InterruptedException {
Job job = bigquery.create(JobInfo.of(configuration));
Job completedJob =
@@ -1846,33 +1893,4 @@ private Job RunQueryJobAndExpectSuccess(QueryJobConfiguration configuration)
return completedJob;
}
-
- static ServiceAccountCredentials loadCredentials(String credentialFile) {
- try (InputStream keyStream = new ByteArrayInputStream(credentialFile.getBytes())) {
- return ServiceAccountCredentials.fromStream(keyStream);
- } catch (IOException e) {
- fail("Couldn't create fake JSON credentials.");
- }
- return null;
- }
-
- static class AppendCompleteCallback implements ApiFutureCallback {
- private static final Object lock = new Object();
- private static int batchCount = 0;
-
- public void onSuccess(AppendRowsResponse response) {
- synchronized (lock) {
- if (response.hasError()) {
- System.out.format("Error: %s\n", response.getError());
- } else {
- ++batchCount;
- System.out.format("Wrote batch %d\n", batchCount);
- }
- }
- }
-
- public void onFailure(Throwable throwable) {
- System.out.format("Error: %s\n", throwable.toString());
- }
- }
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java
similarity index 92%
rename from google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteClientTest.java
rename to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java
index 7af2eece1e..ccb979fe28 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteClientTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java
@@ -41,6 +41,8 @@
import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange;
import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
+import com.google.cloud.bigquery.storage.v1.it.util.BigQueryResource;
+import com.google.cloud.bigquery.storage.v1.it.util.Helper;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
@@ -65,6 +67,7 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
@@ -75,6 +78,7 @@
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.avro.generic.GenericData;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.AfterClass;
@@ -83,8 +87,9 @@
import org.junit.Test;
/** Integration tests for BigQuery Write API. */
-public class ITBigQueryWriteClientTest {
- private static final Logger LOG = Logger.getLogger(ITBigQueryWriteClientTest.class.getName());
+public class ITBigQueryStorageWriteClientTest {
+ private static final Logger LOG =
+ Logger.getLogger(ITBigQueryStorageWriteClientTest.class.getName());
private static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName();
private static final String TABLE = "testtable";
@@ -94,7 +99,9 @@ public class ITBigQueryWriteClientTest {
private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";
- private static BigQueryWriteClient client;
+ private static BigQueryReadClient readClient;
+ private static BigQueryWriteClient writeClient;
+ private static String parentProjectId;
private static TableInfo tableInfo;
private static TableInfo tableInfo2;
@@ -126,9 +133,12 @@ public StringWithSecondsNanos(String fooParam, long secondsParam, int nanosParam
@BeforeClass
public static void beforeClass() throws IOException {
+ readClient = BigQueryReadClient.create();
+
BigQueryWriteSettings settings =
BigQueryWriteSettings.newBuilder().setHeaderProvider(USER_AGENT_HEADER_PROVIDER).build();
- client = BigQueryWriteClient.create(settings);
+ writeClient = BigQueryWriteClient.create(settings);
+ parentProjectId = String.format("projects/%s", ServiceOptions.getDefaultProjectId());
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
bigquery = bigqueryHelper.getOptions().getService();
@@ -217,9 +227,14 @@ public static void beforeClass() throws IOException {
@AfterClass
public static void afterClass() throws InterruptedException {
- if (client != null) {
- client.close();
- client.awaitTermination(10, TimeUnit.SECONDS);
+ if (writeClient != null) {
+ writeClient.close();
+ writeClient.awaitTermination(10, TimeUnit.SECONDS);
+ }
+
+ if (readClient != null) {
+ readClient.close();
+ readClient.awaitTermination(10, TimeUnit.SECONDS);
}
if (bigquery != null) {
@@ -303,7 +318,7 @@ ProtoRows createProtoRowsMixed(StringWithSecondsNanos[] messages) {
public void testBatchWriteWithCommittedStreamEU()
throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableIdEU)
.setWriteStream(
@@ -333,7 +348,7 @@ public void testBatchWriteWithCommittedStreamEU()
public void testProto3OptionalBatchWriteWithCommittedStream()
throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
@@ -385,7 +400,7 @@ public void testJsonStreamWriterCommittedStream()
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
@@ -555,7 +570,7 @@ public void testRequestProfilerWithCommittedStream()
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
@@ -635,7 +650,7 @@ public void testJsonStreamWriterWithDefaultSchema()
// Create JsonStreamWriter with newBuilder(streamOrTable, client)
try (JsonStreamWriter jsonStreamWriter =
- JsonStreamWriter.newBuilder(parent.toString(), client)
+ JsonStreamWriter.newBuilder(parent.toString(), writeClient)
.setIgnoreUnknownFields(true)
.build()) {
LOG.info("Sending one message");
@@ -729,7 +744,7 @@ public void testJsonStreamWriterWithDefaultSchemaNoTable() {
// Create JsonStreamWriter with newBuilder(streamOrTable, client)
try (JsonStreamWriter ignore =
- JsonStreamWriter.newBuilder(parent.toString(), client)
+ JsonStreamWriter.newBuilder(parent.toString(), writeClient)
.setIgnoreUnknownFields(true)
.build()) {
// Do nothing
@@ -920,7 +935,7 @@ public void testJsonDefaultStreamOnTableWithDefaultValue_SchemaNotGiven()
TableInfo.newBuilder(TableId.of(DATASET, tableName), defaultValueTableDefinition).build();
bigquery.create(tableInfo);
try (JsonStreamWriter jsonStreamWriter =
- JsonStreamWriter.newBuilder(defaultTableId, client)
+ JsonStreamWriter.newBuilder(defaultTableId, writeClient)
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.build()) {
testJsonStreamWriterForDefaultValue(jsonStreamWriter);
@@ -943,7 +958,7 @@ public void testJsonExclusiveStreamOnTableWithDefaultValue_GiveTableSchema()
TableInfo.newBuilder(TableId.of(DATASET, tableName), defaultValueTableDefinition).build();
bigquery.create(tableInfo);
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(exclusiveTableId)
.setWriteStream(
@@ -1180,7 +1195,7 @@ private void testArrowIngestion(boolean serializedInput)
}
if (serializedInput) {
try (StreamWriter streamWriter =
- StreamWriter.newBuilder(tableId + "/_default", client)
+ StreamWriter.newBuilder(tableId + "/_default", writeClient)
.setWriterSchema(v1ArrowSchema)
.setTraceId(TEST_TRACE_ID)
.setMaxRetryDuration(java.time.Duration.ofSeconds(5))
@@ -1197,7 +1212,7 @@ private void testArrowIngestion(boolean serializedInput)
}
} else {
try (StreamWriter streamWriter =
- StreamWriter.newBuilder(tableId + "/_default", client)
+ StreamWriter.newBuilder(tableId + "/_default", writeClient)
.setWriterSchema(arrowSchema)
.setTraceId(TEST_TRACE_ID)
.setMaxRetryDuration(java.time.Duration.ofSeconds(5))
@@ -1248,7 +1263,7 @@ public void testJsonStreamWriterWithMessagesOver10M()
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
@@ -1296,14 +1311,14 @@ public void testJsonStreamWriterSchemaUpdate()
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (JsonStreamWriter jsonStreamWriter =
- JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) {
+ JsonStreamWriter.newBuilder(writeStream.getName(), writeClient).build()) {
// write the 1st row
JSONObject foo = new JSONObject();
foo.put("col1", "aaa");
@@ -1381,7 +1396,7 @@ public void testJsonStreamWriterSchemaUpdateConcurrent()
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
@@ -1413,7 +1428,7 @@ public void testJsonStreamWriterSchemaUpdateConcurrent()
// Start writing using the JsonWriter
try (JsonStreamWriter jsonStreamWriter =
- JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) {
+ JsonStreamWriter.newBuilder(writeStream.getName(), writeClient).build()) {
int numberOfThreads = 5;
CountDownLatch latch;
AtomicInteger next;
@@ -1505,7 +1520,7 @@ public void testJsonStreamWriterSchemaUpdateWithMissingValueInterpretationMap()
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
@@ -1518,7 +1533,7 @@ public void testJsonStreamWriterSchemaUpdateWithMissingValueInterpretationMap()
"date_with_default_to_current", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
try (JsonStreamWriter jsonStreamWriter =
- JsonStreamWriter.newBuilder(writeStream.getName(), client)
+ JsonStreamWriter.newBuilder(writeStream.getName(), writeClient)
.setMissingValueInterpretationMap(missingValueMap)
.build()) {
// Verify the missing value map
@@ -1644,7 +1659,7 @@ public void testJsonStreamWriterWithFlexibleColumnName()
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
@@ -1737,7 +1752,7 @@ public void testJsonStreamWriterWithNestedFlexibleColumnName()
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
@@ -1814,14 +1829,14 @@ public void testJsonStreamWriterSchemaUpdateWithFlexibleColumnName()
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (JsonStreamWriter jsonStreamWriter =
- JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) {
+ JsonStreamWriter.newBuilder(writeStream.getName(), writeClient).build()) {
// write the 1st row
JSONObject foo = new JSONObject();
foo.put("col1-列", "aaa");
@@ -1891,7 +1906,7 @@ public void testComplicateSchemaWithPendingStream()
throws IOException, InterruptedException, ExecutionException {
LOG.info("Create a write stream");
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId2)
.setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build())
@@ -1919,7 +1934,7 @@ public void testComplicateSchemaWithPendingStream()
LOG.info("Finalize a write stream");
finalizeResponse =
- client.finalizeWriteStream(
+ writeClient.finalizeWriteStream(
FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
ApiFuture response3 =
@@ -1934,7 +1949,7 @@ public void testComplicateSchemaWithPendingStream()
assertEquals(2, finalizeResponse.getRowCount());
LOG.info("Commit a write stream");
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
- client.batchCommitWriteStreams(
+ writeClient.batchCommitWriteStreams(
BatchCommitWriteStreamsRequest.newBuilder()
.setParent(tableId2)
.addWriteStreams(writeStream.getName())
@@ -1961,7 +1976,7 @@ public void testComplicateSchemaWithPendingStream()
@Test
public void testStreamError() throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
@@ -1995,7 +2010,7 @@ public void testStreamError() throws IOException, InterruptedException, Executio
@Test
public void testStreamSchemaMisMatchError() throws IOException, InterruptedException {
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
@@ -2027,7 +2042,7 @@ public void testStreamSchemaMisMatchError() throws IOException, InterruptedExcep
public void testStreamFinalizedError()
throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
@@ -2042,7 +2057,7 @@ public void testStreamFinalizedError()
streamWriter.append(createProtoRowsMultipleColumns(new String[] {"a"}), /* offset= */ 0);
response.get();
// Finalize the stream in order to trigger STREAM_FINALIZED error
- client.finalizeWriteStream(
+ writeClient.finalizeWriteStream(
FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
// Try to append to a finalized stream
ApiFuture response2 =
@@ -2065,7 +2080,7 @@ public void testStreamFinalizedError()
public void testOffsetAlreadyExistsError()
throws IOException, ExecutionException, InterruptedException {
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
@@ -2101,7 +2116,7 @@ public void testOffsetAlreadyExistsError()
@Test
public void testOffsetOutOfRangeError() throws IOException, InterruptedException {
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
@@ -2133,7 +2148,7 @@ public void testOffsetOutOfRangeError() throws IOException, InterruptedException
@Test
public void testStreamReconnect() throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
- client.createWriteStream(
+ writeClient.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
@@ -2256,4 +2271,114 @@ public void testLargeRequest() throws IOException, InterruptedException, Executi
assertEquals("50", queryIter.next().get(0).getStringValue());
}
}
+
+ @Test
+ public void timestamp_arrowWrite() throws IOException {
+ String timestampFieldName = "timestamp";
+ com.google.cloud.bigquery.Schema tableSchema =
+ com.google.cloud.bigquery.Schema.of(
+ Field.newBuilder(timestampFieldName, StandardSQLTypeName.TIMESTAMP)
+ .setMode(Mode.REQUIRED)
+ .build());
+
+ String tableName = "bqstorage_timestamp_write_arrow";
+ TableId testTableId = TableId.of(DATASET, tableName);
+ bigquery.create(
+ TableInfo.of(
+ testTableId, StandardTableDefinition.newBuilder().setSchema(tableSchema).build()));
+
+ List fields =
+ ImmutableList.of(
+ new org.apache.arrow.vector.types.pojo.Field(
+ timestampFieldName,
+ FieldType.nullable(
+ new ArrowType.Timestamp(
+ org.apache.arrow.vector.types.TimeUnit.MICROSECOND, "UTC")),
+ null));
+ org.apache.arrow.vector.types.pojo.Schema schema =
+ new org.apache.arrow.vector.types.pojo.Schema(fields, null);
+
+ TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
+ try (StreamWriter streamWriter =
+ StreamWriter.newBuilder(parent.toString() + "/_default").setWriterSchema(schema).build()) {
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+ TimeStampMicroTZVector timestampVector =
+ (TimeStampMicroTZVector) root.getVector(timestampFieldName);
+ timestampVector.allocateNew(Helper.INPUT_TIMESTAMPS_MICROS.length);
+
+ for (int i = 0; i < Helper.INPUT_TIMESTAMPS_MICROS.length; i++) {
+ timestampVector.set(i, Helper.INPUT_TIMESTAMPS_MICROS[i]);
+ }
+ root.setRowCount(Helper.INPUT_TIMESTAMPS_MICROS.length);
+
+ CompressionCodec codec =
+ NoCompressionCodec.Factory.INSTANCE.createCodec(
+ CompressionUtil.CodecType.NO_COMPRESSION);
+ VectorUnloader vectorUnloader =
+ new VectorUnloader(root, /* includeNullCount= */ true, codec, /* alignBuffers= */ true);
+ org.apache.arrow.vector.ipc.message.ArrowRecordBatch batch =
+ vectorUnloader.getRecordBatch();
+ // Asynchronous append.
+ streamWriter.append(batch, -1);
+ }
+ }
+ String table =
+ BigQueryResource.formatTableResource(
+ ServiceOptions.getDefaultProjectId(), DATASET, tableName);
+ List rows = Helper.readAllRows(readClient, parentProjectId, table, null);
+ List timestamps =
+ rows.stream().map(x -> (Long) x.get(timestampFieldName)).collect(Collectors.toList());
+ assertEquals(timestamps.size(), Helper.EXPECTED_TIMESTAMPS_MICROS.length);
+ for (int i = 0; i < timestamps.size(); i++) {
+ assertEquals(timestamps.get(i), Helper.EXPECTED_TIMESTAMPS_MICROS[i]);
+ }
+ }
+
+ @Test
+ public void timestamp_protobufWrite()
+ throws IOException, DescriptorValidationException, InterruptedException {
+ String timestampFieldName = "timestamp";
+ com.google.cloud.bigquery.Schema bqTableSchema =
+ com.google.cloud.bigquery.Schema.of(
+ Field.newBuilder(timestampFieldName, StandardSQLTypeName.TIMESTAMP)
+ .setMode(Mode.REQUIRED)
+ .build());
+
+ String tableName = "bqstorage_timestamp_write_avro";
+ TableId testTableId = TableId.of(DATASET, tableName);
+ bigquery.create(
+ TableInfo.of(
+ testTableId, StandardTableDefinition.newBuilder().setSchema(bqTableSchema).build()));
+
+ TableFieldSchema TEST_TIMESTAMP =
+ TableFieldSchema.newBuilder()
+ .setName(timestampFieldName)
+ .setType(TableFieldSchema.Type.TIMESTAMP)
+ .setMode(TableFieldSchema.Mode.NULLABLE)
+ .build();
+ TableSchema tableSchema = TableSchema.newBuilder().addFields(TEST_TIMESTAMP).build();
+
+ TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
+ try (JsonStreamWriter jsonStreamWriter =
+ JsonStreamWriter.newBuilder(parent.toString(), tableSchema).build()) {
+ for (long timestampMicro : Helper.INPUT_TIMESTAMPS_MICROS) {
+ JSONArray jsonArray = new JSONArray();
+ JSONObject row = new JSONObject();
+ row.put(timestampFieldName, timestampMicro);
+ jsonArray.put(row);
+ jsonStreamWriter.append(jsonArray);
+ }
+ }
+
+ String table =
+ BigQueryResource.formatTableResource(
+ ServiceOptions.getDefaultProjectId(), DATASET, tableName);
+ List rows = Helper.readAllRows(readClient, parentProjectId, table, null);
+ List timestamps =
+ rows.stream().map(x -> (Long) x.get(timestampFieldName)).collect(Collectors.toList());
+ assertEquals(timestamps.size(), Helper.EXPECTED_TIMESTAMPS_MICROS.length);
+ for (int i = 0; i < timestamps.size(); i++) {
+ assertEquals(timestamps.get(i), Helper.EXPECTED_TIMESTAMPS_MICROS[i]);
+ }
+ }
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java
index b2200ab086..8772a95632 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java
@@ -38,6 +38,7 @@
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
+import com.google.cloud.bigquery.storage.v1.it.util.WriteRetryTestUtil;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java
index ec835d944b..1573309606 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java
@@ -26,6 +26,7 @@
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.WriteStream;
+import com.google.cloud.bigquery.storage.v1.it.util.WriteRetryTestUtil;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/BigQueryResource.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/BigQueryResource.java
similarity index 95%
rename from google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/BigQueryResource.java
rename to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/BigQueryResource.java
index 6a33b50f0f..04daffb348 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/BigQueryResource.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/BigQueryResource.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.google.cloud.bigquery.storage.v1.it;
+package com.google.cloud.bigquery.storage.v1.it.util;
/** Test helper class to generate BigQuery resource paths. */
public class BigQueryResource {
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java
new file mode 100644
index 0000000000..26883f59ff
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1.it.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
+import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1.DataFormat;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.util.Timestamps;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+
+public class Helper {
+
+ public static final Long[] INPUT_TIMESTAMPS_MICROS =
+ new Long[] {
+ 1735734896123456L, // 2025-01-01T12:34:56.123456Z
+ 1580646896123456L, // 2020-02-02T12:34:56.123456Z
+ 636467696123456L, // 1990-03-03T12:34:56.123456Z
+ 165846896123456L // 1975-04-04T12:34:56.123456Z
+ };
+
+ public static final Long[] EXPECTED_TIMESTAMPS_MICROS =
+ new Long[] {
+ 1735734896123456L, // 2025-01-01T12:34:56.123456Z
+ 1580646896123456L, // 2020-02-02T12:34:56.123456Z
+ 636467696123456L, // 1990-03-03T12:34:56.123456Z
+ 165846896123456L // 1975-04-04T12:34:56.123456Z
+ };
+
+ public static ServiceAccountCredentials loadCredentials(String credentialFile) {
+ try (InputStream keyStream = new ByteArrayInputStream(credentialFile.getBytes())) {
+ return ServiceAccountCredentials.fromStream(keyStream);
+ } catch (IOException e) {
+ fail("Couldn't create fake JSON credentials.");
+ }
+ return null;
+ }
+
+ public static class AppendCompleteCallback implements ApiFutureCallback {
+ private final Object lock = new Object();
+ private int batchCount = 0;
+
+ public void onSuccess(AppendRowsResponse response) {
+ synchronized (lock) {
+ if (response.hasError()) {
+ System.out.format("Error: %s\n", response.getError());
+ } else {
+ ++batchCount;
+ System.out.format("Wrote batch %d\n", batchCount);
+ }
+ }
+ }
+
+ public void onFailure(Throwable throwable) {
+ System.out.format("Error: %s\n", throwable.toString());
+ }
+ }
+
+ /**
+ * Reads all the rows from the specified table.
+ *
+ * For every row, the consumer is called for processing.
+ *
+ * @param table
+ * @param snapshotInMillis Optional. If specified, all rows up to timestamp will be returned.
+ * @param filter Optional. If specified, it will be used to restrict returned data.
+ * @param consumer that receives all Avro rows.
+ * @throws IOException
+ */
+ public static void processRowsAtSnapshot(
+ BigQueryReadClient client,
+ String parentProjectId,
+ String table,
+ Long snapshotInMillis,
+ String filter,
+ SimpleRowReaderAvro.AvroRowConsumer consumer)
+ throws IOException {
+ Preconditions.checkNotNull(table);
+ Preconditions.checkNotNull(consumer);
+
+ CreateReadSessionRequest.Builder createSessionRequestBuilder =
+ CreateReadSessionRequest.newBuilder()
+ .setParent(parentProjectId)
+ .setMaxStreamCount(1)
+ .setReadSession(
+ ReadSession.newBuilder().setTable(table).setDataFormat(DataFormat.AVRO).build());
+
+ if (snapshotInMillis != null) {
+ createSessionRequestBuilder
+ .getReadSessionBuilder()
+ .setTableModifiers(
+ ReadSession.TableModifiers.newBuilder()
+ .setSnapshotTime(Timestamps.fromMillis(snapshotInMillis))
+ .build());
+ }
+
+ if (filter != null && !filter.isEmpty()) {
+ createSessionRequestBuilder
+ .getReadSessionBuilder()
+ .setReadOptions(
+ ReadSession.TableReadOptions.newBuilder().setRowRestriction(filter).build());
+ }
+
+ ReadSession session = client.createReadSession(createSessionRequestBuilder.build());
+ assertEquals(
+ String.format(
+ "Did not receive expected number of streams for table '%s' CreateReadSession"
+ + " response:%n%s",
+ table, session.toString()),
+ 1,
+ session.getStreamsCount());
+
+ ReadRowsRequest readRowsRequest =
+ ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build();
+
+ SimpleRowReaderAvro reader =
+ new SimpleRowReaderAvro(new Schema.Parser().parse(session.getAvroSchema().getSchema()));
+
+ ServerStream stream = client.readRowsCallable().call(readRowsRequest);
+ for (ReadRowsResponse response : stream) {
+ reader.processRows(response.getAvroRows(), consumer);
+ }
+ }
+
+ /**
+ * Reads all the rows from the specified table and returns a list as generic Avro records.
+ *
+ * @param table
+ * @param filter Optional. If specified, it will be used to restrict returned data.
+ * @return
+ */
+ public static List readAllRows(
+ BigQueryReadClient client, String parentProjectId, String table, String filter)
+ throws IOException {
+ final List rows = new ArrayList<>();
+ processRowsAtSnapshot(
+ client,
+ parentProjectId,
+ /* table= */ table,
+ /* snapshotInMillis= */ null,
+ /* filter= */ filter,
+ (SimpleRowReaderAvro.AvroRowConsumer)
+ record -> {
+ // clone the record since that reference will be reused by the reader.
+ rows.add(new GenericRecordBuilder(record).build());
+ });
+ return rows;
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderArrow.java
similarity index 87%
rename from google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java
rename to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderArrow.java
index 685f72fbc9..e4afb9b1ab 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderArrow.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.google.cloud.bigquery.storage.v1.it;
+package com.google.cloud.bigquery.storage.v1.it.util;
import static com.google.common.truth.Truth.assertThat;
@@ -23,7 +23,6 @@
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
@@ -50,17 +49,34 @@ public interface ArrowBatchConsumer {
void accept(VectorSchemaRoot root);
}
+ public static class ArrowTimestampBatchConsumer implements ArrowBatchConsumer {
+ private final List expectedTimestampValues;
+
+ public ArrowTimestampBatchConsumer(List expectedTimestampValues) {
+ this.expectedTimestampValues = expectedTimestampValues;
+ }
+
+ @Override
+ public void accept(VectorSchemaRoot root) {
+ FieldVector timestampFieldVector = root.getVector("timestamp");
+ int count = timestampFieldVector.getValueCount();
+ for (int i = 0; i < count; i++) {
+ long value = (Long) timestampFieldVector.getObject(i);
+ assertThat(value).isEqualTo(expectedTimestampValues.get(i));
+ }
+ }
+ }
+
/** ArrowRangeBatchConsumer accepts batch Arrow data and validate the range values. */
public static class ArrowRangeBatchConsumer implements ArrowBatchConsumer {
-
- private final ImmutableMap expectedRangeDateValues;
- private final ImmutableMap expectedRangeDatetimeValues;
- private final ImmutableMap expectedRangeTimestampValues;
+ private final Map expectedRangeDateValues;
+ private final Map expectedRangeDatetimeValues;
+ private final Map expectedRangeTimestampValues;
public ArrowRangeBatchConsumer(
- ImmutableMap expectedRangeDateValues,
- ImmutableMap expectedRangeDatetimeValues,
- ImmutableMap expectedRangeTimestampValues) {
+ Map expectedRangeDateValues,
+ Map expectedRangeDatetimeValues,
+ Map expectedRangeTimestampValues) {
this.expectedRangeDateValues = expectedRangeDateValues;
this.expectedRangeDatetimeValues = expectedRangeDatetimeValues;
this.expectedRangeTimestampValues = expectedRangeTimestampValues;
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderAvro.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderAvro.java
similarity index 97%
rename from google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderAvro.java
rename to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderAvro.java
index a23179c8c8..4914e93f5b 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderAvro.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderAvro.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.google.cloud.bigquery.storage.v1.it;
+package com.google.cloud.bigquery.storage.v1.it.util;
import com.google.cloud.bigquery.storage.v1.AvroRows;
import com.google.common.base.Preconditions;
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/WriteRetryTestUtil.java
similarity index 99%
rename from google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java
rename to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/WriteRetryTestUtil.java
index e11e0707df..872fe2dbb8 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/WriteRetryTestUtil.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.google.cloud.bigquery.storage.v1.it;
+package com.google.cloud.bigquery.storage.v1.it.util;
import static org.junit.Assert.assertFalse;
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java
index 7fb96b5596..ad8e66d956 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java
@@ -244,7 +244,7 @@ public void testSimpleRead() {
@Test
public void testSimpleReadArrow() {
String table =
- com.google.cloud.bigquery.storage.v1.it.BigQueryResource.formatTableResource(
+ com.google.cloud.bigquery.storage.v1.it.util.BigQueryResource.formatTableResource(
/* projectId= */ "bigquery-public-data",
/* datasetId= */ "samples",
/* tableId= */ "shakespeare");
@@ -308,7 +308,7 @@ public void testRangeType() throws InterruptedException {
bigquery.query(createTable);
String table =
- com.google.cloud.bigquery.storage.v1.it.BigQueryResource.formatTableResource(
+ com.google.cloud.bigquery.storage.v1.it.util.BigQueryResource.formatTableResource(
/* projectId= */ ServiceOptions.getDefaultProjectId(),
/* datasetId= */ DATASET,
/* tableId= */ tableId.getTable());