From da303da1ccae6e5e76bbae4a305bf27b20c921b5 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 5 Jan 2026 17:21:41 -0500 Subject: [PATCH 01/14] chore: Add samples showcases reading a timestamp value from public dataset --- .../ReadTimestampArrowSample.java | 178 ++++++++++++++++++ .../ReadTimestampAvroSample.java | 152 +++++++++++++++ 2 files changed, 330 insertions(+) create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java new file mode 100644 index 0000000000..ff82038f8c --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java @@ -0,0 +1,178 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.bigquerystorage; + +// [START bigquerystorage_read_timestamp_arrow] +import com.google.api.gax.rpc.ServerStream; +import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch; +import com.google.cloud.bigquery.storage.v1.ArrowSchema; +import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.DataFormat; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers; +import com.google.common.base.Preconditions; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + +public class ReadTimestampArrowSample { + /* + * SimpleRowReader handles deserialization of the Apache Arrow-encoded row batches transmitted + * from the storage API using a generic datum decoder. + */ + private static class SimpleRowReader implements AutoCloseable { + + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + + // Decoder object will be reused to avoid re-allocation and too much garbage collection. + private final VectorSchemaRoot root; + private final VectorLoader loader; + + public SimpleRowReader(ArrowSchema arrowSchema) throws IOException { + Schema schema = + MessageSerializer.deserializeSchema( + new ReadChannel( + new ByteArrayReadableSeekableByteChannel( + arrowSchema.getSerializedSchema().toByteArray()))); + Preconditions.checkNotNull(schema); + List vectors = new ArrayList<>(); + for (Field field : schema.getFields()) { + vectors.add(field.createVector(allocator)); + } + root = new VectorSchemaRoot(vectors); + loader = new VectorLoader(root); + } + + /** + * Sample method for processing Arrow data which only validates decoding. + * + * @param batch object returned from the ReadRowsResponse. + */ + public void processRows(ArrowRecordBatch batch) throws IOException { + org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch = + MessageSerializer.deserializeRecordBatch( + new ReadChannel( + new ByteArrayReadableSeekableByteChannel( + batch.getSerializedRecordBatch().toByteArray())), + allocator); + + loader.load(deserializedBatch); + // Release buffers from batch (they are still held in the vectors in root). + deserializedBatch.close(); + System.out.println(root.contentToTSVString()); + // Release buffers from vectors in root. + root.clear(); + } + + @Override + public void close() { + root.close(); + allocator.close(); + } + } + + public static void main(String... args) throws Exception { + // Sets your Google Cloud Platform project ID. + String projectId = "lawrence-test-project-2"; +// String projectId = args[0]; + Integer snapshotMillis = null; + if (args.length > 1) { + snapshotMillis = Integer.parseInt(args[1]); + } + + try (BigQueryReadClient client = BigQueryReadClient.create()) { + String parent = String.format("projects/%s", projectId); + + // This example uses baby name data from the public datasets. + String srcTable = + String.format( + "projects/%s/datasets/%s/tables/%s", + "bigquery-public-data", "new_york_citibike", "citibike_stations"); + + // We specify the columns to be projected by adding them to the selected fields, + ReadSession.TableReadOptions options = + ReadSession.TableReadOptions.newBuilder() + .addSelectedFields("last_reported") + .build(); + + // Start specifying the read session we want created. + ReadSession.Builder sessionBuilder = + ReadSession.newBuilder() + .setTable(srcTable) + // This API can also deliver data serialized in Apache Avro format. + // This example leverages Apache Arrow. + .setDataFormat(DataFormat.ARROW) + .setReadOptions(options); + + // Optionally specify the snapshot time. When unspecified, snapshot time is "now". + if (snapshotMillis != null) { + Timestamp t = + Timestamp.newBuilder() + .setSeconds(snapshotMillis / 1000) + .setNanos(((snapshotMillis % 1000) * 1000000)) + .build(); + TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build(); + sessionBuilder.setTableModifiers(modifiers); + } + + // Begin building the session creation request. + CreateReadSessionRequest.Builder builder = + CreateReadSessionRequest.newBuilder() + .setParent(parent) + .setReadSession(sessionBuilder) + .setMaxStreamCount(1); + + ReadSession session = client.createReadSession(builder.build()); + // Setup a simple reader and start a read session. + try (ReadTimestampArrowSample.SimpleRowReader reader = new ReadTimestampArrowSample.SimpleRowReader(session.getArrowSchema())) { + + // Assert that there are streams available in the session. An empty table may not have + // data available. If no sessions are available for an anonymous (cached) table, consider + // writing results of a query to a named table rather than consuming cached results + // directly. + Preconditions.checkState(session.getStreamsCount() > 0); + + // Use the first stream to perform reading. + String streamName = session.getStreams(0).getName(); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).build(); + + // Process each block of rows as they arrive and decode using our simple row reader. + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasArrowRecordBatch()); + reader.processRows(response.getArrowRecordBatch()); + } + } + } + } +} +// [END bigquerystorage_read_timestamp_arrow] diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java new file mode 100644 index 0000000000..fd07d5763b --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java @@ -0,0 +1,152 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.bigquerystorage; + +// [START bigquerystorage_read_timestamp_avro] +import com.google.api.gax.rpc.ServerStream; +import com.google.cloud.bigquery.storage.v1.AvroRows; +import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.DataFormat; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers; +import com.google.common.base.Preconditions; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; + +public class ReadTimestampAvroSample { + /* + * SimpleRowReader handles deserialization of the Avro-encoded row blocks transmitted + * from the storage API using a generic datum decoder. + */ + private static class SimpleRowReader { + + private final DatumReader datumReader; + + // Decoder object will be reused to avoid re-allocation and too much garbage collection. + private BinaryDecoder decoder = null; + + // GenericRecord object will be reused. + private GenericRecord row = null; + + public SimpleRowReader(Schema schema) { + Preconditions.checkNotNull(schema); + datumReader = new GenericDatumReader<>(schema); + } + + /** + * Sample method for processing AVRO rows which only validates decoding. + * + * @param avroRows object returned from the ReadRowsResponse. + */ + public void processRows(AvroRows avroRows) throws IOException { + decoder = + DecoderFactory.get() + .binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), decoder); + + while (!decoder.isEnd()) { + // Reusing object row + row = datumReader.read(row, decoder); + System.out.println(row.toString()); + } + } + } + + public static void main(String... args) throws Exception { + // Sets your Google Cloud Platform project ID. + String projectId = args[0]; + Integer snapshotMillis = null; + if (args.length > 1) { + snapshotMillis = Integer.parseInt(args[1]); + } + + try (BigQueryReadClient client = BigQueryReadClient.create()) { + String parent = String.format("projects/%s", projectId); + + // This example uses baby name data from the public datasets. + String srcTable = + String.format( + "projects/%s/datasets/%s/tables/%s", + "bigquery-public-data", "new_york_citibike", "citibike_stations"); + + // We specify the columns to be projected by adding them to the selected fields, + ReadSession.TableReadOptions options = + ReadSession.TableReadOptions.newBuilder() + .addSelectedFields("last_reported") + .build(); + + // Start specifying the read session we want created. + ReadSession.Builder sessionBuilder = + ReadSession.newBuilder() + .setTable(srcTable) + // This API can also deliver data serialized in Apache Avro format. + // This example leverages Apache Avro. + .setDataFormat(DataFormat.AVRO) + .setReadOptions(options); + + // Optionally specify the snapshot time. When unspecified, snapshot time is "now". + if (snapshotMillis != null) { + Timestamp t = + Timestamp.newBuilder() + .setSeconds(snapshotMillis / 1000) + .setNanos(((snapshotMillis % 1000) * 1000000)) + .build(); + TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build(); + sessionBuilder.setTableModifiers(modifiers); + } + + // Begin building the session creation request. + CreateReadSessionRequest.Builder builder = + CreateReadSessionRequest.newBuilder() + .setParent(parent) + .setReadSession(sessionBuilder) + .setMaxStreamCount(1); + + // Request the session creation. + ReadSession session = client.createReadSession(builder.build()); + + SimpleRowReader reader = + new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema())); + + // Assert that there are streams available in the session. An empty table may not have + // data available. If no sessions are available for an anonymous (cached) table, consider + // writing results of a query to a named table rather than consuming cached results directly. + Preconditions.checkState(session.getStreamsCount() > 0); + + // Use the first stream to perform reading. + String streamName = session.getStreams(0).getName(); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).build(); + + // Process each block of rows as they arrive and decode using our simple row reader. + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasAvroRows()); + reader.processRows(response.getAvroRows()); + } + } + } +} +// [END bigquerystorage_read_timestamp_avro] From e0276579f6a1ac25eea917da10d5eaf52cfa708a Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 5 Jan 2026 19:04:54 -0500 Subject: [PATCH 02/14] docs: Add samples for using timestamps with BQStorage Read and Write API --- samples/snippets/pom.xml | 4 + .../ReadTimestampArrowSample.java | 19 +- .../ReadTimestampAvroSample.java | 4 +- .../WriteToDefaultStreamTimestampJson.java | 296 ++++++++++++++ ...riteToDefaultStreamTimestampWithArrow.java | 364 ++++++++++++++++++ 5 files changed, 678 insertions(+), 9 deletions(-) create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index 9d1318f911..213ed53d6c 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -119,6 +119,10 @@ + + com.spotify.fmt + fmt-maven-plugin + org.xolstice.maven.plugins protobuf-maven-plugin diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java index ff82038f8c..cbe2c9fbf3 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java @@ -42,6 +42,14 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; +/** + * Depending on the JDK version, you may need to include this into your VM options: {@code + * --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED} + * + *

See the {@link documentation} for + * more information. + */ public class ReadTimestampArrowSample { /* * SimpleRowReader handles deserialization of the Apache Arrow-encoded row batches transmitted @@ -100,8 +108,8 @@ public void close() { public static void main(String... args) throws Exception { // Sets your Google Cloud Platform project ID. - String projectId = "lawrence-test-project-2"; -// String projectId = args[0]; + String projectId = "lawrence-test-project-2"; + // String projectId = args[0]; Integer snapshotMillis = null; if (args.length > 1) { snapshotMillis = Integer.parseInt(args[1]); @@ -118,9 +126,7 @@ public static void main(String... args) throws Exception { // We specify the columns to be projected by adding them to the selected fields, ReadSession.TableReadOptions options = - ReadSession.TableReadOptions.newBuilder() - .addSelectedFields("last_reported") - .build(); + ReadSession.TableReadOptions.newBuilder().addSelectedFields("last_reported").build(); // Start specifying the read session we want created. ReadSession.Builder sessionBuilder = @@ -151,7 +157,8 @@ public static void main(String... args) throws Exception { ReadSession session = client.createReadSession(builder.build()); // Setup a simple reader and start a read session. - try (ReadTimestampArrowSample.SimpleRowReader reader = new ReadTimestampArrowSample.SimpleRowReader(session.getArrowSchema())) { + try (ReadTimestampArrowSample.SimpleRowReader reader = + new ReadTimestampArrowSample.SimpleRowReader(session.getArrowSchema())) { // Assert that there are streams available in the session. An empty table may not have // data available. If no sessions are available for an anonymous (cached) table, consider diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java index fd07d5763b..0a82eeddf2 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java @@ -92,9 +92,7 @@ public static void main(String... args) throws Exception { // We specify the columns to be projected by adding them to the selected fields, ReadSession.TableReadOptions options = - ReadSession.TableReadOptions.newBuilder() - .addSelectedFields("last_reported") - .build(); + ReadSession.TableReadOptions.newBuilder().addSelectedFields("last_reported").build(); // Start specifying the read session we want created. ReadSession.Builder sessionBuilder = diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java new file mode 100644 index 0000000000..151e7801d1 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java @@ -0,0 +1,296 @@ +package com.example.bigquerystorage; + +// [START bigquerystorage_timestamp_jsonstreamwriter_default] +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableResult; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.concurrent.GuardedBy; +import org.json.JSONArray; +import org.json.JSONObject; +import org.threeten.bp.Duration; + +public class WriteToDefaultStreamTimestampJson { + + public static void runWriteToDefaultStream() + throws Descriptors.DescriptorValidationException, InterruptedException, IOException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "MY_PROJECT_ID"; + String datasetName = "MY_DATASET_NAME"; + String tableName = "MY_TABLE_NAME"; + writeToDefaultStream(projectId, datasetName, tableName); + } + + // Create a JSON object that is compatible with the table schema. + private static JSONObject buildRecord() { + JSONObject record = new JSONObject(); + record.put("timestampField", Instant.now().toString()); + return record; + } + + public static void writeToDefaultStream(String projectId, String datasetName, String tableName) + throws Descriptors.DescriptorValidationException, InterruptedException, IOException { + TableName parentTable = TableName.of(projectId, datasetName, tableName); + + DataWriter writer = new DataWriter(); + // One time initialization for the worker. + writer.initialize(parentTable); + + // Write two batches of fake data to the stream, each with 10 JSON records. Data may be + // batched up to the maximum request size: + // https://cloud.google.com/bigquery/quotas#write-api-limits + for (int i = 0; i < 2; i++) { + JSONArray jsonArr = new JSONArray(); + for (int j = 0; j < 10; j++) { + jsonArr.put(buildRecord()); + } + + writer.append(new AppendContext(jsonArr)); + } + + // Final cleanup for the stream during worker teardown. + writer.cleanup(); + verifyExpectedRowCount(parentTable, 20); + System.out.println("Appended records successfully."); + } + + private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount) + throws InterruptedException { + String queryRowCount = + "SELECT COUNT(*) FROM `" + + parentTable.getProject() + + "." + + parentTable.getDataset() + + "." + + parentTable.getTable() + + "`"; + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + TableResult results = bigquery.query(queryConfig); + int countRowsActual = + Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + if (countRowsActual != expectedRowCount) { + throw new RuntimeException( + "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); + } + } + + private static class AppendContext { + JSONArray data; + + AppendContext(JSONArray data) { + this.data = data; + } + } + + private static class DataWriter { + + private static final int MAX_RECREATE_COUNT = 3; + + private BigQueryWriteClient client; + + // Track the number of in-flight requests to wait for all responses before shutting down. + private final Phaser inflightRequestCount = new Phaser(1); + private final Object lock = new Object(); + private JsonStreamWriter streamWriter; + + @GuardedBy("lock") + private RuntimeException error = null; + + private final AtomicInteger recreateCount = new AtomicInteger(0); + + private JsonStreamWriter createStreamWriter(String tableName) + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Configure in-stream automatic retry settings. + // Error codes that are immediately retried: + // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED + // Error codes that are retried with exponential backoff: + // * RESOURCE_EXHAUSTED + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + + // Use the JSON stream writer to send records in JSON format. Specify the table name to write + // to the default stream. + // For more information about JsonStreamWriter, see: + // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html + return JsonStreamWriter.newBuilder(tableName, client) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setChannelProvider( + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveWithoutCalls(true) + .build()) + .setEnableConnectionPool(true) + // This will allow connection pool to scale up better. + .setFlowControlSettings( + FlowControlSettings.newBuilder().setMaxOutstandingElementCount(100L).build()) + // If value is missing in json and there is a default value configured on bigquery + // column, apply the default value to the missing value field. + .setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) + .setRetrySettings(retrySettings) + .build(); + } + + public void initialize(TableName parentTable) + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Initialize client without settings, internally within stream writer a new client will be + // created with full settings. + client = BigQueryWriteClient.create(); + + streamWriter = createStreamWriter(parentTable.toString()); + } + + public void append(AppendContext appendContext) + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + synchronized (this.lock) { + if (!streamWriter.isUserClosed() + && streamWriter.isClosed() + && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) { + streamWriter = createStreamWriter(streamWriter.getStreamName()); + this.error = null; + } + // If earlier appends have failed, we need to reset before continuing. + if (this.error != null) { + throw this.error; + } + } + // Append asynchronously for increased throughput. + ApiFuture future = streamWriter.append(appendContext.data); + ApiFutures.addCallback( + future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor()); + + // Increase the count of in-flight requests. + inflightRequestCount.register(); + } + + public void cleanup() { + // Wait for all in-flight requests to complete. + inflightRequestCount.arriveAndAwaitAdvance(); + + client.close(); + // Close the connection to the server. + streamWriter.close(); + + // Verify that no error occurred in the stream. + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + } + + static class AppendCompleteCallback implements ApiFutureCallback { + + private final DataWriter parent; + private final AppendContext appendContext; + + public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) { + this.parent = parent; + this.appendContext = appendContext; + } + + public void onSuccess(AppendRowsResponse response) { + System.out.println("Append success"); + this.parent.recreateCount.set(0); + done(); + } + + public void onFailure(Throwable throwable) { + if (throwable instanceof Exceptions.AppendSerializationError) { + Exceptions.AppendSerializationError ase = (Exceptions.AppendSerializationError) throwable; + Map rowIndexToErrorMessage = ase.getRowIndexToErrorMessage(); + if (!rowIndexToErrorMessage.isEmpty()) { + // Omit the faulty rows + JSONArray dataNew = new JSONArray(); + for (int i = 0; i < appendContext.data.length(); i++) { + if (!rowIndexToErrorMessage.containsKey(i)) { + dataNew.put(appendContext.data.get(i)); + } else { + // process faulty rows by placing them on a dead-letter-queue, for instance + } + } + + // Retry the remaining valid rows, but using a separate thread to + // avoid potentially blocking while we are in a callback. + if (!dataNew.isEmpty()) { + try { + this.parent.append(new AppendContext(dataNew)); + } catch (DescriptorValidationException | IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + // Mark the existing attempt as done since we got a response for it + done(); + return; + } + } + + boolean resendRequest = false; + if (throwable instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { + resendRequest = true; + } else if (throwable instanceof Exceptions.StreamWriterClosedException) { + if (!parent.streamWriter.isUserClosed()) { + resendRequest = true; + } + } + if (resendRequest) { + // Retry this request. + try { + this.parent.append(new AppendContext(appendContext.data)); + } catch (Descriptors.DescriptorValidationException + | IOException + | InterruptedException e) { + throw new RuntimeException(e); + } + // Mark the existing attempt as done since we got a response for it + done(); + return; + } + + synchronized (this.parent.lock) { + if (this.parent.error == null) { + Exceptions.StorageException storageException = Exceptions.toStorageException(throwable); + this.parent.error = + (storageException != null) ? storageException : new RuntimeException(throwable); + } + } + done(); + } + + private void done() { + // Reduce the count of in-flight requests. + this.parent.inflightRequestCount.arriveAndDeregister(); + } + } + } +} +// [END bigquerystorage_timestamp_jsonstreamwriter_default] diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java new file mode 100644 index 0000000000..cad7289cbd --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java @@ -0,0 +1,364 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +// [START bigquerystorage_timestamp_streamwriter_default_arrow] +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableResult; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.concurrent.GuardedBy; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.NoCompressionCodec; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.threeten.bp.Duration; + +/** + * This class demonstrates how to ingest data using Arrow format into BigQuery via the default + * stream. It initiates a DataWriter to establish a connection to BigQuery and reuses this + * connection to continuously ingest data. + * + *

Depending on the JDK version, you may need to include this into your VM options: {@code + * --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED}. See the {@link documentation} for + * more information. + */ +public class WriteToDefaultStreamTimestampWithArrow { + + public static void main(String[] args) throws InterruptedException, IOException { + if (args.length < 3) { + System.out.println( + "Usage: WriteToDefaultStreamWithArrow "); + return; + } + String projectId = args[0]; + String datasetName = args[1]; + String tableName = args[2]; + // For this sample, the table schema should contain 3 fields: + // ['timestampField': TIMESTAMP] + writeToDefaultStreamWithArrow(projectId, datasetName, tableName); + } + + private static Schema createArrowSchema() { + List fields = + ImmutableList.of( + new Field( + "timestampField", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC")), + null)); + return new Schema(fields, null); + } + + // Create an ArrowRecordBatch object that is compatible with the table schema. + private static ArrowRecordBatch buildRecordBatch(VectorSchemaRoot root, int rowCount) { + TimeStampNanoTZVector timestampField = (TimeStampNanoTZVector) root.getVector("timestampField"); + timestampField.allocateNew(rowCount); + + for (int i = 0; i < rowCount; i++) { + timestampField.set(i, Instant.now().getNano()); + } + root.setRowCount(rowCount); + + CompressionCodec codec = + NoCompressionCodec.Factory.INSTANCE.createCodec(CompressionUtil.CodecType.NO_COMPRESSION); + VectorUnloader vectorUnloader = + new VectorUnloader(root, /* includeNullCount= */ true, codec, /* alignBuffers= */ true); + return vectorUnloader.getRecordBatch(); + } + + public static void writeToDefaultStreamWithArrow( + String projectId, String datasetName, String tableName) + throws InterruptedException, IOException { + TableName parentTable = TableName.of(projectId, datasetName, tableName); + Schema arrowSchema = createArrowSchema(); + DataWriter writer = new DataWriter(); + // One time initialization for the worker. + writer.initialize(parentTable, arrowSchema); + long initialRowCount = getRowCount(parentTable); + BufferAllocator allocator = new RootAllocator(); + + // A writer should be used to ingest as much data as possible before teardown. + // Append 100 batches. + for (int i = 0; i < 100; i++) { + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + // Each batch has 10 rows. + ArrowRecordBatch batch = buildRecordBatch(root, 10); + + // Asynchronous append. + writer.append(new ArrowData(arrowSchema, batch)); + } + } + // Final cleanup for the stream during worker teardown. + // It's blocked until all append requests' response are received. + writer.cleanup(); + + verifyExpectedRowCount(parentTable, initialRowCount + 1000); + System.out.println("Appended records successfully."); + } + + private static long getRowCount(TableName parentTable) throws InterruptedException { + String queryRowCount = + "SELECT COUNT(*) FROM `" + + parentTable.getProject() + + "." + + parentTable.getDataset() + + "." + + parentTable.getTable() + + "`"; + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); + BigQuery bigquery = + BigQueryOptions.newBuilder().setProjectId(parentTable.getProject()).build().getService(); + TableResult results = bigquery.query(queryConfig); + return Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue()); + } + + private static void verifyExpectedRowCount(TableName parentTable, long expectedRowCount) + throws InterruptedException { + String queryRowCount = + "SELECT COUNT(*) FROM `" + + parentTable.getProject() + + "." + + parentTable.getDataset() + + "." + + parentTable.getTable() + + "`"; + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); + BigQuery bigquery = + BigQueryOptions.newBuilder().setProjectId(parentTable.getProject()).build().getService(); + TableResult results = bigquery.query(queryConfig); + int countRowsActual = + Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + if (countRowsActual != expectedRowCount) { + throw new RuntimeException( + "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); + } + } + + private static class ArrowData { + Schema arrowSchema; + ArrowRecordBatch data; + + ArrowData(Schema arrowSchema, ArrowRecordBatch data) { + this.arrowSchema = arrowSchema; + this.data = data; + } + } + + private static class DataWriter { + + private static final int MAX_RECREATE_COUNT = 3; + + private BigQueryWriteClient client; + + // Track the number of in-flight requests to wait for all responses before shutting down. + private final Phaser inflightRequestCount = new Phaser(1); + private final Object lock = new Object(); + + private Schema arrowSchema; + private StreamWriter streamWriter; + + @GuardedBy("lock") + private RuntimeException error = null; + + private final AtomicInteger recreateCount = new AtomicInteger(0); + + private StreamWriter createStreamWriter(String streamName, Schema arrowSchema) + throws IOException { + // Configure in-stream automatic retry settings. + // Error codes that are immediately retried: + // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED + // Error codes that are retried with exponential backoff: + // * RESOURCE_EXHAUSTED + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + + // Use the Stream writer to send records in Arrow format. Specify the table name to write + // to the default stream. + // For more information about StreamWriter, see: + // https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.StreamWriter + return StreamWriter.newBuilder(streamName, client) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setChannelProvider( + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveWithoutCalls(true) + .build()) + .setEnableConnectionPool(true) + // If value is missing in ArrowRecordBatch and there is a default value configured on + // bigquery column, apply the default value to the missing value field. + .setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) + .setMaxRetryDuration(java.time.Duration.ofSeconds(5)) + // Set the StreamWriter with Arrow Schema, this would only allow the StreamWriter to + // append data in Arrow format. + .setWriterSchema(arrowSchema) + .setRetrySettings(retrySettings) + .build(); + } + + public void initialize(TableName parentTable, Schema arrowSchema) throws IOException { + // Initialize client without settings, internally within stream writer a new client will be + // created with full settings. + client = BigQueryWriteClient.create(); + + streamWriter = createStreamWriter(parentTable.toString() + "/_default", arrowSchema); + } + + public void append(ArrowData arrowData) throws IOException { + synchronized (this.lock) { + if (!streamWriter.isUserClosed() + && streamWriter.isClosed() + && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) { + streamWriter = createStreamWriter(streamWriter.getStreamName(), arrowData.arrowSchema); + this.error = null; + } + // If earlier appends have failed, we need to reset before continuing. + if (this.error != null) { + throw this.error; + } + } + // Append asynchronously for increased throughput. + ApiFuture future = streamWriter.append(arrowData.data); + ApiFutures.addCallback( + future, new AppendCompleteCallback(this, arrowData), MoreExecutors.directExecutor()); + + // Increase the count of in-flight requests. + inflightRequestCount.register(); + } + + public void cleanup() { + // Wait for all in-flight requests to complete. + inflightRequestCount.arriveAndAwaitAdvance(); + + client.close(); + // Close the connection to the server. + streamWriter.close(); + + // Verify that no error occurred in the stream. + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + } + + static class AppendCompleteCallback implements ApiFutureCallback { + + private final DataWriter parent; + private final ArrowData arrowData; + + public AppendCompleteCallback(DataWriter parent, ArrowData arrowData) { + this.parent = parent; + this.arrowData = arrowData; + } + + public void onSuccess(AppendRowsResponse response) { + System.out.println("Append success"); + this.parent.recreateCount.set(0); + done(); + } + + public void onFailure(Throwable throwable) { + System.out.println("Append failed: " + throwable.toString()); + if (throwable instanceof Exceptions.AppendSerializationError) { + Exceptions.AppendSerializationError ase = (Exceptions.AppendSerializationError) throwable; + Map rowIndexToErrorMessage = ase.getRowIndexToErrorMessage(); + if (!rowIndexToErrorMessage.isEmpty()) { + System.out.println("row level errors: " + rowIndexToErrorMessage); + // The append returned failure with indices for faulty rows. + // Fix the faulty rows or remove them from the appended data and retry the append. + done(); + return; + } + } + + boolean resendRequest = false; + if (throwable instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { + resendRequest = true; + } else if (throwable instanceof Exceptions.StreamWriterClosedException) { + if (!parent.streamWriter.isUserClosed()) { + resendRequest = true; + } + } + if (resendRequest) { + // Retry this request. + try { + this.parent.append(new ArrowData(arrowData.arrowSchema, arrowData.data)); + } catch (IOException e) { + throw new RuntimeException(e); + } + // Mark the existing attempt as done since we got a response for it + done(); + return; + } + + synchronized (this.parent.lock) { + if (this.parent.error == null) { + Exceptions.StorageException storageException = Exceptions.toStorageException(throwable); + this.parent.error = + (storageException != null) ? storageException : new RuntimeException(throwable); + } + } + done(); + } + + private void done() { + // Reduce the count of in-flight requests. + this.parent.inflightRequestCount.arriveAndDeregister(); + } + } + } +} +// [END bigquerystorage_timestamp_streamwriter_default_arrow] From e87134f0231082c2162351a34f00ccc2f52a8434 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 5 Jan 2026 19:06:00 -0500 Subject: [PATCH 03/14] chore: Add missing header --- .../WriteToDefaultStreamTimestampJson.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java index 151e7801d1..aeb38e5648 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java @@ -1,3 +1,18 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.example.bigquerystorage; // [START bigquerystorage_timestamp_jsonstreamwriter_default] From 0776f8047ef9d997d0b36cb66a2e19870d5ab41f Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 5 Jan 2026 19:09:45 -0500 Subject: [PATCH 04/14] chore: Remove fmt plugin in samples --- samples/snippets/pom.xml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index 213ed53d6c..9d1318f911 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -119,10 +119,6 @@ - - com.spotify.fmt - fmt-maven-plugin - org.xolstice.maven.plugins protobuf-maven-plugin From b12bb5c2a8c16e8c46ecb187d0a7f0bd1c209b8c Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 5 Jan 2026 19:42:49 -0500 Subject: [PATCH 05/14] chore: Fix samples lint issues --- .../example/bigquerystorage/ReadTimestampArrowSample.java | 6 +++--- .../example/bigquerystorage/ReadTimestampAvroSample.java | 1 + .../bigquerystorage/WriteToDefaultStreamTimestampJson.java | 1 + .../WriteToDefaultStreamTimestampWithArrow.java | 6 +++--- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java index cbe2c9fbf3..1637a88229 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.example.bigquerystorage; // [START bigquerystorage_read_timestamp_arrow] @@ -46,9 +47,8 @@ * Depending on the JDK version, you may need to include this into your VM options: {@code * --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED} * - *

See the {@link documentation} for - * more information. + *

See the documentation + * for more information. */ public class ReadTimestampArrowSample { /* diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java index 0a82eeddf2..d3cb7c21d6 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.example.bigquerystorage; // [START bigquerystorage_read_timestamp_avro] diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java index aeb38e5648..8dabb34e1c 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.example.bigquerystorage; // [START bigquerystorage_timestamp_jsonstreamwriter_default] diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java index cad7289cbd..f3134e02e8 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java @@ -65,9 +65,9 @@ * connection to continuously ingest data. * *

Depending on the JDK version, you may need to include this into your VM options: {@code - * --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED}. See the {@link documentation} for - * more information. + * --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED}. See the + * documentation + * for more information. */ public class WriteToDefaultStreamTimestampWithArrow { From 32dd2f18251b1b7ceb8d9f4b340329bdb9600b8a Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 6 Jan 2026 11:41:23 -0500 Subject: [PATCH 06/14] chore: Add ITs for the added samples --- ...rowSample.java => ReadTimestampArrow.java} | 14 +- ...AvroSample.java => ReadTimestampAvro.java} | 2 +- .../bigquerystorage/WriteNestedProto.java | 146 ++++++------ ...riteToDefaultStreamTimestampWithArrow.java | 6 +- .../bigquerystorage/ReadTimestampArrowIT.java | 55 +++++ .../bigquerystorage/ReadTimestampAvroIT.java | 54 +++++ .../bigquerystorage/WriteNestedProtoIT.java | 219 +++++++++--------- .../WriteToDefaultStreamTimestampJsonIT.java | 96 ++++++++ ...teToDefaultStreamTimestampWithArrowIT.java | 96 ++++++++ 9 files changed, 495 insertions(+), 193 deletions(-) rename samples/snippets/src/main/java/com/example/bigquerystorage/{ReadTimestampArrowSample.java => ReadTimestampArrow.java} (94%) rename samples/snippets/src/main/java/com/example/bigquerystorage/{ReadTimestampAvroSample.java => ReadTimestampAvro.java} (99%) create mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampArrowIT.java create mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampAvroIT.java create mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJsonIT.java create mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrowIT.java diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java similarity index 94% rename from samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java rename to samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java index 1637a88229..9df770d106 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrowSample.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java @@ -47,10 +47,11 @@ * Depending on the JDK version, you may need to include this into your VM options: {@code * --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED} * - *

See the documentation - * for more information. + *

See the documentation for + * more information. */ -public class ReadTimestampArrowSample { +public class ReadTimestampArrow { /* * SimpleRowReader handles deserialization of the Apache Arrow-encoded row batches transmitted * from the storage API using a generic datum decoder. @@ -108,8 +109,7 @@ public void close() { public static void main(String... args) throws Exception { // Sets your Google Cloud Platform project ID. - String projectId = "lawrence-test-project-2"; - // String projectId = args[0]; + String projectId = args[0]; Integer snapshotMillis = null; if (args.length > 1) { snapshotMillis = Integer.parseInt(args[1]); @@ -157,8 +157,8 @@ public static void main(String... args) throws Exception { ReadSession session = client.createReadSession(builder.build()); // Setup a simple reader and start a read session. - try (ReadTimestampArrowSample.SimpleRowReader reader = - new ReadTimestampArrowSample.SimpleRowReader(session.getArrowSchema())) { + try (ReadTimestampArrow.SimpleRowReader reader = + new ReadTimestampArrow.SimpleRowReader(session.getArrowSchema())) { // Assert that there are streams available in the session. An empty table may not have // data available. If no sessions are available for an anonymous (cached) table, consider diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java similarity index 99% rename from samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java rename to samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java index d3cb7c21d6..5b37364beb 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvroSample.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java @@ -36,7 +36,7 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -public class ReadTimestampAvroSample { +public class ReadTimestampAvro { /* * SimpleRowReader handles deserialization of the Avro-encoded row blocks transmitted * from the storage API using a generic datum decoder. diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java index 92ab52724c..4615691da9 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java @@ -1,73 +1,73 @@ -/* - * Copyright 2025 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.example.bigquerystorage; - -// [START bigquerystorage_writenestedproto] -import com.google.api.core.ApiFuture; -import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1.ProtoRows; -import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter; -import com.google.cloud.bigquery.storage.v1.StreamWriter; -import com.google.protobuf.Descriptors.DescriptorValidationException; -import java.io.IOException; -import java.util.concurrent.ExecutionException; - -public class WriteNestedProto { - - public static void runWriteNestedProto(String projectId, String datasetName, String tableName) - throws DescriptorValidationException, InterruptedException, IOException { - StreamWriter streamWriter = - StreamWriter.newBuilder( - "projects/" - + projectId - + "/datasets/" - + datasetName - + "/tables/" - + tableName - + "/_default") - .setWriterSchema(ProtoSchemaConverter.convert(HasNestedMessage.getDescriptor())) - .build(); - ProtoRows protoRows = - ProtoRows.newBuilder() - .addSerializedRows( - HasNestedMessage.newBuilder() - .setFoo("foo") - .setBar( - HasNestedMessage.InnerMessage.newBuilder() - .setMyInt(12345) - .setMyString("bar") - .build()) - .build() - .toByteString()) - .addSerializedRows( - HasSeparateNestedMessage.newBuilder() - .setFoo("foo2") - .setBar( - SeparateMessage.newBuilder().setMyInt(123456).setMyString("bar2").build()) - .build() - .toByteString()) - .build(); - ApiFuture future = streamWriter.append(protoRows); - try { - AppendRowsResponse response = future.get(); - System.out.println("Appended records successfully."); - } catch (ExecutionException e) { - System.out.println(e); - } - } -} -// [END bigquerystorage_writenestedproto] +/// * +// * Copyright 2025 Google LLC +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +// package com.example.bigquerystorage; +// +//// [START bigquerystorage_writenestedproto] +// import com.google.api.core.ApiFuture; +// import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +// import com.google.cloud.bigquery.storage.v1.ProtoRows; +// import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter; +// import com.google.cloud.bigquery.storage.v1.StreamWriter; +// import com.google.protobuf.Descriptors.DescriptorValidationException; +// import java.io.IOException; +// import java.util.concurrent.ExecutionException; +// +// public class WriteNestedProto { +// +// public static void runWriteNestedProto(String projectId, String datasetName, String tableName) +// throws DescriptorValidationException, InterruptedException, IOException { +// StreamWriter streamWriter = +// StreamWriter.newBuilder( +// "projects/" +// + projectId +// + "/datasets/" +// + datasetName +// + "/tables/" +// + tableName +// + "/_default") +// .setWriterSchema(ProtoSchemaConverter.convert(HasNestedMessage.getDescriptor())) +// .build(); +// ProtoRows protoRows = +// ProtoRows.newBuilder() +// .addSerializedRows( +// HasNestedMessage.newBuilder() +// .setFoo("foo") +// .setBar( +// HasNestedMessage.InnerMessage.newBuilder() +// .setMyInt(12345) +// .setMyString("bar") +// .build()) +// .build() +// .toByteString()) +// .addSerializedRows( +// HasSeparateNestedMessage.newBuilder() +// .setFoo("foo2") +// .setBar( +// SeparateMessage.newBuilder().setMyInt(123456).setMyString("bar2").build()) +// .build() +// .toByteString()) +// .build(); +// ApiFuture future = streamWriter.append(protoRows); +// try { +// AppendRowsResponse response = future.get(); +// System.out.println("Appended records successfully."); +// } catch (ExecutionException e) { +// System.out.println(e); +// } +// } +// } +//// [END bigquerystorage_writenestedproto] diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java index f3134e02e8..a7611da662 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java @@ -65,9 +65,9 @@ * connection to continuously ingest data. * *

Depending on the JDK version, you may need to include this into your VM options: {@code - * --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED}. See the - * documentation - * for more information. + * --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED}. See the documentation for + * more information. */ public class WriteToDefaultStreamTimestampWithArrow { diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampArrowIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampArrowIT.java new file mode 100644 index 0000000000..f8f428dd90 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampArrowIT.java @@ -0,0 +1,55 @@ +/* + * Copyright 2026 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ReadTimestampArrowIT { + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + private PrintStream out; + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + System.setOut(out); + } + + @After + public void tearDown() { + System.setOut(null); + } + + @Test + public void testQuickstart() throws Exception { + ReadTimestampArrow.main(PROJECT_ID); + String got = bout.toString(); + // Ensure that `last_reported` column is in the output + assertThat(got).contains("last_reported"); + } +} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampAvroIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampAvroIT.java new file mode 100644 index 0000000000..ae7420fab6 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampAvroIT.java @@ -0,0 +1,54 @@ +/* + * Copyright 2026 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ReadTimestampAvroIT { + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bout); + System.setOut(out); + } + + @After + public void tearDown() { + System.setOut(null); + } + + @Test + public void testReadTimestampAvro() throws Exception { + ReadTimestampAvro.main(PROJECT_ID); + String got = bout.toString(); + // Ensure that `last_reported` column is in the output + assertThat(got).contains("last_reported"); + } +} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java index 6293b301a7..854ae6c6f7 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java @@ -1,109 +1,110 @@ -/* - * Copyright 2025 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.example.bigquerystorage; - -import static com.google.common.truth.Truth.assertThat; -import static junit.framework.TestCase.assertNotNull; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; -import com.google.cloud.bigquery.BigQueryOptions; -import com.google.cloud.bigquery.DatasetId; -import com.google.cloud.bigquery.DatasetInfo; -import com.google.cloud.bigquery.FieldList; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; -import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.TableInfo; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.UUID; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class WriteNestedProtoIT { - - private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); - - private ByteArrayOutputStream bout; - private PrintStream out; - private BigQuery bigquery; - private String datasetName; - private String tableName; - - private static void requireEnvVar(String varName) { - assertNotNull( - "Environment variable " + varName + " is required to perform these tests.", - System.getenv(varName)); - } - - @BeforeClass - public static void checkRequirements() { - requireEnvVar("GOOGLE_CLOUD_PROJECT"); - } - - @Before - public void setUp() { - bout = new ByteArrayOutputStream(); - out = new PrintStream(bout); - System.setOut(out); - - bigquery = BigQueryOptions.getDefaultInstance().getService(); - - // Create a new dataset and table for each test. - datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); - tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); - Schema schema = - Schema.of( - com.google.cloud.bigquery.Field.newBuilder("foo", StandardSQLTypeName.STRING).build(), - com.google.cloud.bigquery.Field.newBuilder( - "bar", - StandardSQLTypeName.STRUCT, - FieldList.of( - com.google.cloud.bigquery.Field.newBuilder( - "my_int", StandardSQLTypeName.INT64) - .build(), - com.google.cloud.bigquery.Field.newBuilder( - "my_string", StandardSQLTypeName.STRING) - .build())) - .build()); - bigquery.create(DatasetInfo.newBuilder(datasetName).build()); - TableInfo tableInfo = - TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) - .build(); - bigquery.create(tableInfo); - } - - @After - public void tearDown() { - bigquery.delete( - DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); - System.setOut(null); - } - - @Test - public void testWriteNestedProto() throws Exception { - WriteNestedProto.runWriteNestedProto(GOOGLE_CLOUD_PROJECT, datasetName, tableName); - assertThat(bout.toString()).contains("Appended records successfully."); - } -} +/// * +// * Copyright 2025 Google LLC +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +// package com.example.bigquerystorage; +// +// import static com.google.common.truth.Truth.assertThat; +// import static junit.framework.TestCase.assertNotNull; +// +// import com.google.cloud.bigquery.BigQuery; +// import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; +// import com.google.cloud.bigquery.BigQueryOptions; +// import com.google.cloud.bigquery.DatasetId; +// import com.google.cloud.bigquery.DatasetInfo; +// import com.google.cloud.bigquery.FieldList; +// import com.google.cloud.bigquery.Schema; +// import com.google.cloud.bigquery.StandardSQLTypeName; +// import com.google.cloud.bigquery.StandardTableDefinition; +// import com.google.cloud.bigquery.TableId; +// import com.google.cloud.bigquery.TableInfo; +// import java.io.ByteArrayOutputStream; +// import java.io.PrintStream; +// import java.util.UUID; +// import org.junit.After; +// import org.junit.Before; +// import org.junit.BeforeClass; +// import org.junit.Test; +// import org.junit.runner.RunWith; +// import org.junit.runners.JUnit4; +// +// @RunWith(JUnit4.class) +// public class WriteNestedProtoIT { +// +// private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); +// +// private ByteArrayOutputStream bout; +// private PrintStream out; +// private BigQuery bigquery; +// private String datasetName; +// private String tableName; +// +// private static void requireEnvVar(String varName) { +// assertNotNull( +// "Environment variable " + varName + " is required to perform these tests.", +// System.getenv(varName)); +// } +// +// @BeforeClass +// public static void checkRequirements() { +// requireEnvVar("GOOGLE_CLOUD_PROJECT"); +// } +// +// @Before +// public void setUp() { +// bout = new ByteArrayOutputStream(); +// out = new PrintStream(bout); +// System.setOut(out); +// +// bigquery = BigQueryOptions.getDefaultInstance().getService(); +// +// // Create a new dataset and table for each test. +// datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); +// tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); +// Schema schema = +// Schema.of( +// com.google.cloud.bigquery.Field.newBuilder("foo", StandardSQLTypeName.STRING).build(), +// com.google.cloud.bigquery.Field.newBuilder( +// "bar", +// StandardSQLTypeName.STRUCT, +// FieldList.of( +// com.google.cloud.bigquery.Field.newBuilder( +// "my_int", StandardSQLTypeName.INT64) +// .build(), +// com.google.cloud.bigquery.Field.newBuilder( +// "my_string", StandardSQLTypeName.STRING) +// .build())) +// .build()); +// bigquery.create(DatasetInfo.newBuilder(datasetName).build()); +// TableInfo tableInfo = +// TableInfo.newBuilder(TableId.of(datasetName, tableName), +// StandardTableDefinition.of(schema)) +// .build(); +// bigquery.create(tableInfo); +// } +// +// @After +// public void tearDown() { +// bigquery.delete( +// DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); +// System.setOut(null); +// } +// +// @Test +// public void testWriteNestedProto() throws Exception { +// WriteNestedProto.runWriteNestedProto(GOOGLE_CLOUD_PROJECT, datasetName, tableName); +// assertThat(bout.toString()).contains("Appended records successfully."); +// } +// } diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJsonIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJsonIT.java new file mode 100644 index 0000000000..0e5e8bcf80 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJsonIT.java @@ -0,0 +1,96 @@ +/* + * Copyright 2026 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; +import static junit.framework.TestCase.assertNotNull; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WriteToDefaultStreamTimestampJsonIT { + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + private BigQuery bigquery; + private String datasetName; + private String tableName; + + private static void requireEnvVar() { + assertNotNull( + "Environment variable " + "GOOGLE_CLOUD_PROJECT" + " is required to perform these tests.", + System.getenv("GOOGLE_CLOUD_PROJECT")); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar(); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bout); + System.setOut(out); + + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset and table for each test. + datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + Schema schema = + Schema.of(Field.newBuilder("timestampField", StandardSQLTypeName.TIMESTAMP).build()); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + TableInfo tableInfo = + TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) + .build(); + bigquery.create(tableInfo); + } + + @After + public void tearDown() { + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), + BigQuery.DatasetDeleteOption.deleteContents()); + System.setOut(null); + } + + @Test + public void testWriteToDefaultStream() throws Exception { + WriteToDefaultStreamTimestampJson.writeToDefaultStream( + GOOGLE_CLOUD_PROJECT, datasetName, tableName); + assertThat(bout.toString()).contains("Appended records successfully."); + } +} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrowIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrowIT.java new file mode 100644 index 0000000000..2561de9859 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrowIT.java @@ -0,0 +1,96 @@ +/* + * Copyright 2026 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; +import static junit.framework.TestCase.assertNotNull; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WriteToDefaultStreamTimestampWithArrowIT { + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + private BigQuery bigquery; + private String datasetName; + private String tableName; + + private static void requireEnvVar() { + assertNotNull( + "Environment variable GOOGLE_CLOUD_PROJECT is required to perform these tests.", + System.getenv("GOOGLE_CLOUD_PROJECT")); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar(); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bout); + System.setOut(out); + + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset and table for each test. + datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + Schema schema = + Schema.of(Field.newBuilder("timestampField", StandardSQLTypeName.TIMESTAMP).build()); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + TableInfo tableInfo = + TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) + .build(); + bigquery.create(tableInfo); + } + + @After + public void tearDown() { + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), + BigQuery.DatasetDeleteOption.deleteContents()); + System.setOut(null); + } + + @Test + public void testWriteToDefaultStream() throws Exception { + WriteToDefaultStreamTimestampJson.writeToDefaultStream( + GOOGLE_CLOUD_PROJECT, datasetName, tableName); + assertThat(bout.toString()).contains("Appended records successfully."); + } +} From b840e7c90d2f560b24b688f4f56479f8ea2252e6 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 6 Jan 2026 11:44:30 -0500 Subject: [PATCH 07/14] chore: Fix commented out nested samples --- .../bigquerystorage/WriteNestedProto.java | 146 ++++++------ .../bigquerystorage/WriteNestedProtoIT.java | 219 +++++++++--------- 2 files changed, 182 insertions(+), 183 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java index 4615691da9..55dc96a8b6 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java @@ -1,73 +1,73 @@ -/// * -// * Copyright 2025 Google LLC -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// package com.example.bigquerystorage; -// -//// [START bigquerystorage_writenestedproto] -// import com.google.api.core.ApiFuture; -// import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; -// import com.google.cloud.bigquery.storage.v1.ProtoRows; -// import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter; -// import com.google.cloud.bigquery.storage.v1.StreamWriter; -// import com.google.protobuf.Descriptors.DescriptorValidationException; -// import java.io.IOException; -// import java.util.concurrent.ExecutionException; -// -// public class WriteNestedProto { -// -// public static void runWriteNestedProto(String projectId, String datasetName, String tableName) -// throws DescriptorValidationException, InterruptedException, IOException { -// StreamWriter streamWriter = -// StreamWriter.newBuilder( -// "projects/" -// + projectId -// + "/datasets/" -// + datasetName -// + "/tables/" -// + tableName -// + "/_default") -// .setWriterSchema(ProtoSchemaConverter.convert(HasNestedMessage.getDescriptor())) -// .build(); -// ProtoRows protoRows = -// ProtoRows.newBuilder() -// .addSerializedRows( -// HasNestedMessage.newBuilder() -// .setFoo("foo") -// .setBar( -// HasNestedMessage.InnerMessage.newBuilder() -// .setMyInt(12345) -// .setMyString("bar") -// .build()) -// .build() -// .toByteString()) -// .addSerializedRows( -// HasSeparateNestedMessage.newBuilder() -// .setFoo("foo2") -// .setBar( -// SeparateMessage.newBuilder().setMyInt(123456).setMyString("bar2").build()) -// .build() -// .toByteString()) -// .build(); -// ApiFuture future = streamWriter.append(protoRows); -// try { -// AppendRowsResponse response = future.get(); -// System.out.println("Appended records successfully."); -// } catch (ExecutionException e) { -// System.out.println(e); -// } -// } -// } -//// [END bigquerystorage_writenestedproto] +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +// [START bigquerystorage_writenestedproto] +import com.google.api.core.ApiFuture; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter; +import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +public class WriteNestedProto { + + public static void runWriteNestedProto(String projectId, String datasetName, String tableName) + throws DescriptorValidationException, InterruptedException, IOException { + StreamWriter streamWriter = + StreamWriter.newBuilder( + "projects/" + + projectId + + "/datasets/" + + datasetName + + "/tables/" + + tableName + + "/_default") + .setWriterSchema(ProtoSchemaConverter.convert(HasNestedMessage.getDescriptor())) + .build(); + ProtoRows protoRows = + ProtoRows.newBuilder() + .addSerializedRows( + HasNestedMessage.newBuilder() + .setFoo("foo") + .setBar( + HasNestedMessage.InnerMessage.newBuilder() + .setMyInt(12345) + .setMyString("bar") + .build()) + .build() + .toByteString()) + .addSerializedRows( + HasSeparateNestedMessage.newBuilder() + .setFoo("foo2") + .setBar( + SeparateMessage.newBuilder().setMyInt(123456).setMyString("bar2").build()) + .build() + .toByteString()) + .build(); + ApiFuture future = streamWriter.append(protoRows); + try { + AppendRowsResponse response = future.get(); + System.out.println("Appended records successfully."); + } catch (ExecutionException e) { + System.out.println(e); + } + } +} +// [END bigquerystorage_writenestedproto] \ No newline at end of file diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java index 854ae6c6f7..b9423ccbea 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java @@ -1,110 +1,109 @@ -/// * -// * Copyright 2025 Google LLC -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// package com.example.bigquerystorage; -// -// import static com.google.common.truth.Truth.assertThat; -// import static junit.framework.TestCase.assertNotNull; -// -// import com.google.cloud.bigquery.BigQuery; -// import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; -// import com.google.cloud.bigquery.BigQueryOptions; -// import com.google.cloud.bigquery.DatasetId; -// import com.google.cloud.bigquery.DatasetInfo; -// import com.google.cloud.bigquery.FieldList; -// import com.google.cloud.bigquery.Schema; -// import com.google.cloud.bigquery.StandardSQLTypeName; -// import com.google.cloud.bigquery.StandardTableDefinition; -// import com.google.cloud.bigquery.TableId; -// import com.google.cloud.bigquery.TableInfo; -// import java.io.ByteArrayOutputStream; -// import java.io.PrintStream; -// import java.util.UUID; -// import org.junit.After; -// import org.junit.Before; -// import org.junit.BeforeClass; -// import org.junit.Test; -// import org.junit.runner.RunWith; -// import org.junit.runners.JUnit4; -// -// @RunWith(JUnit4.class) -// public class WriteNestedProtoIT { -// -// private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); -// -// private ByteArrayOutputStream bout; -// private PrintStream out; -// private BigQuery bigquery; -// private String datasetName; -// private String tableName; -// -// private static void requireEnvVar(String varName) { -// assertNotNull( -// "Environment variable " + varName + " is required to perform these tests.", -// System.getenv(varName)); -// } -// -// @BeforeClass -// public static void checkRequirements() { -// requireEnvVar("GOOGLE_CLOUD_PROJECT"); -// } -// -// @Before -// public void setUp() { -// bout = new ByteArrayOutputStream(); -// out = new PrintStream(bout); -// System.setOut(out); -// -// bigquery = BigQueryOptions.getDefaultInstance().getService(); -// -// // Create a new dataset and table for each test. -// datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); -// tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); -// Schema schema = -// Schema.of( -// com.google.cloud.bigquery.Field.newBuilder("foo", StandardSQLTypeName.STRING).build(), -// com.google.cloud.bigquery.Field.newBuilder( -// "bar", -// StandardSQLTypeName.STRUCT, -// FieldList.of( -// com.google.cloud.bigquery.Field.newBuilder( -// "my_int", StandardSQLTypeName.INT64) -// .build(), -// com.google.cloud.bigquery.Field.newBuilder( -// "my_string", StandardSQLTypeName.STRING) -// .build())) -// .build()); -// bigquery.create(DatasetInfo.newBuilder(datasetName).build()); -// TableInfo tableInfo = -// TableInfo.newBuilder(TableId.of(datasetName, tableName), -// StandardTableDefinition.of(schema)) -// .build(); -// bigquery.create(tableInfo); -// } -// -// @After -// public void tearDown() { -// bigquery.delete( -// DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); -// System.setOut(null); -// } -// -// @Test -// public void testWriteNestedProto() throws Exception { -// WriteNestedProto.runWriteNestedProto(GOOGLE_CLOUD_PROJECT, datasetName, tableName); -// assertThat(bout.toString()).contains("Appended records successfully."); -// } -// } +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; +import static junit.framework.TestCase.assertNotNull; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WriteNestedProtoIT { + + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + private PrintStream out; + private BigQuery bigquery; + private String datasetName; + private String tableName; + + private static void requireEnvVar(String varName) { + assertNotNull( + "Environment variable " + varName + " is required to perform these tests.", + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + System.setOut(out); + + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset and table for each test. + datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + Schema schema = + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("foo", StandardSQLTypeName.STRING).build(), + com.google.cloud.bigquery.Field.newBuilder( + "bar", + StandardSQLTypeName.STRUCT, + FieldList.of( + com.google.cloud.bigquery.Field.newBuilder( + "my_int", StandardSQLTypeName.INT64) + .build(), + com.google.cloud.bigquery.Field.newBuilder( + "my_string", StandardSQLTypeName.STRING) + .build())) + .build()); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + TableInfo tableInfo = + TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) + .build(); + bigquery.create(tableInfo); + } + + @After + public void tearDown() { + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); + System.setOut(null); + } + + @Test + public void testWriteNestedProto() throws Exception { + WriteNestedProto.runWriteNestedProto(GOOGLE_CLOUD_PROJECT, datasetName, tableName); + assertThat(bout.toString()).contains("Appended records successfully."); + } +} \ No newline at end of file From 71a451383da9c729ec2d1b1dd773af46a67c9c7b Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 6 Jan 2026 14:12:42 -0500 Subject: [PATCH 08/14] chore: Address code comments --- .../com/example/bigquerystorage/ExportOpenTelemetry.java | 2 +- .../com/example/bigquerystorage/ReadTimestampArrow.java | 2 +- .../com/example/bigquerystorage/ReadTimestampAvro.java | 2 +- .../com/example/bigquerystorage/WriteToDefaultStream.java | 2 +- .../bigquerystorage/WriteToDefaultStreamTimestampJson.java | 2 +- .../WriteToDefaultStreamTimestampWithArrow.java | 7 +++++-- .../bigquerystorage/WriteToDefaultStreamWithArrow.java | 2 +- 7 files changed, 11 insertions(+), 8 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java index 08604d4d9d..43074b4805 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java @@ -175,7 +175,7 @@ private JsonStreamWriter createStreamWriter(String tableName) // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html return JsonStreamWriter.newBuilder(tableName, client) - .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10))) .setChannelProvider( BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java index 9df770d106..805721040d 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java @@ -118,7 +118,7 @@ public static void main(String... args) throws Exception { try (BigQueryReadClient client = BigQueryReadClient.create()) { String parent = String.format("projects/%s", projectId); - // This example uses baby name data from the public datasets. + // This example uses citibike data from the public datasets. String srcTable = String.format( "projects/%s/datasets/%s/tables/%s", diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java index 5b37364beb..d3e4ddcdce 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java @@ -85,7 +85,7 @@ public static void main(String... args) throws Exception { try (BigQueryReadClient client = BigQueryReadClient.create()) { String parent = String.format("projects/%s", projectId); - // This example uses baby name data from the public datasets. + // This example uses citibike data from the public datasets. String srcTable = String.format( "projects/%s/datasets/%s/tables/%s", diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 483238a816..828f81d873 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -176,7 +176,7 @@ private JsonStreamWriter createStreamWriter(String tableName) // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html return JsonStreamWriter.newBuilder(tableName, client) - .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10))) .setChannelProvider( BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java index 8dabb34e1c..2cdbdd7b60 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java @@ -157,7 +157,7 @@ private JsonStreamWriter createStreamWriter(String tableName) // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html return JsonStreamWriter.newBuilder(tableName, client) - .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10))) .setChannelProvider( BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java index a7611da662..bec5a13540 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java @@ -71,6 +71,8 @@ */ public class WriteToDefaultStreamTimestampWithArrow { + public static final long NANOS = 1000000000L; + public static void main(String[] args) throws InterruptedException, IOException { if (args.length < 3) { System.out.println( @@ -100,8 +102,9 @@ private static ArrowRecordBatch buildRecordBatch(VectorSchemaRoot root, int rowC TimeStampNanoTZVector timestampField = (TimeStampNanoTZVector) root.getVector("timestampField"); timestampField.allocateNew(rowCount); + Instant now = Instant.now(); for (int i = 0; i < rowCount; i++) { - timestampField.set(i, Instant.now().getNano()); + timestampField.set(i, now.getEpochSecond() * NANOS + now.getNano()); } root.setRowCount(rowCount); @@ -228,7 +231,7 @@ private StreamWriter createStreamWriter(String streamName, Schema arrowSchema) // For more information about StreamWriter, see: // https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.StreamWriter return StreamWriter.newBuilder(streamName, client) - .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10))) .setChannelProvider( BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java index d0bc455a9a..3374e715f1 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java @@ -236,7 +236,7 @@ private StreamWriter createStreamWriter(String streamName, Schema arrowSchema) // For more information about StreamWriter, see: // https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.StreamWriter return StreamWriter.newBuilder(streamName, client) - .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10))) .setChannelProvider( BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) From 66f7fcb6affd21752e439dc8f6c248ccc48fd9ba Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 6 Jan 2026 16:06:27 -0500 Subject: [PATCH 09/14] chore: Address code comments --- .../bigquerystorage/ReadTimestampArrow.java | 6 ++--- .../bigquerystorage/ReadTimestampAvro.java | 6 ++--- ...riteToDefaultStreamTimestampWithArrow.java | 22 +++++++++---------- .../WriteToDefaultStreamWithArrow.java | 22 +++++++++---------- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java index 805721040d..829bbb31e9 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java @@ -110,9 +110,9 @@ public void close() { public static void main(String... args) throws Exception { // Sets your Google Cloud Platform project ID. String projectId = args[0]; - Integer snapshotMillis = null; + Long snapshotMillis = null; if (args.length > 1) { - snapshotMillis = Integer.parseInt(args[1]); + snapshotMillis = Long.parseLong(args[1]); } try (BigQueryReadClient client = BigQueryReadClient.create()) { @@ -142,7 +142,7 @@ public static void main(String... args) throws Exception { Timestamp t = Timestamp.newBuilder() .setSeconds(snapshotMillis / 1000) - .setNanos(((snapshotMillis % 1000) * 1000000)) + .setNanos((int) ((snapshotMillis % 1000) * 1000000)) .build(); TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build(); sessionBuilder.setTableModifiers(modifiers); diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java index d3e4ddcdce..6343c7739f 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java @@ -77,9 +77,9 @@ public void processRows(AvroRows avroRows) throws IOException { public static void main(String... args) throws Exception { // Sets your Google Cloud Platform project ID. String projectId = args[0]; - Integer snapshotMillis = null; + Long snapshotMillis = null; if (args.length > 1) { - snapshotMillis = Integer.parseInt(args[1]); + snapshotMillis = Long.parseLong(args[1]); } try (BigQueryReadClient client = BigQueryReadClient.create()) { @@ -109,7 +109,7 @@ public static void main(String... args) throws Exception { Timestamp t = Timestamp.newBuilder() .setSeconds(snapshotMillis / 1000) - .setNanos(((snapshotMillis % 1000) * 1000000)) + .setNanos((int) ((snapshotMillis % 1000) * 1000000)) .build(); TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build(); sessionBuilder.setTableModifiers(modifiers); diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java index bec5a13540..6797aea936 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java @@ -124,17 +124,17 @@ public static void writeToDefaultStreamWithArrow( // One time initialization for the worker. writer.initialize(parentTable, arrowSchema); long initialRowCount = getRowCount(parentTable); - BufferAllocator allocator = new RootAllocator(); - - // A writer should be used to ingest as much data as possible before teardown. - // Append 100 batches. - for (int i = 0; i < 100; i++) { - try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { - // Each batch has 10 rows. - ArrowRecordBatch batch = buildRecordBatch(root, 10); - - // Asynchronous append. - writer.append(new ArrowData(arrowSchema, batch)); + try (BufferAllocator allocator = new RootAllocator()) { + // A writer should be used to ingest as much data as possible before teardown. + // Append 100 batches. + for (int i = 0; i < 100; i++) { + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + // Each batch has 10 rows. + ArrowRecordBatch batch = buildRecordBatch(root, 10); + + // Asynchronous append. + writer.append(new ArrowData(arrowSchema, batch)); + } } } // Final cleanup for the stream during worker teardown. diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java index 3374e715f1..d29577c1a4 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java @@ -129,17 +129,17 @@ public static void writeToDefaultStreamWithArrow( // One time initialization for the worker. writer.initialize(parentTable, arrowSchema); long initialRowCount = getRowCount(parentTable); - BufferAllocator allocator = new RootAllocator(); - - // A writer should be used to ingest as much data as possible before teardown. - // Append 100 batches. - for (int i = 0; i < 100; i++) { - try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { - // Each batch has 10 rows. - ArrowRecordBatch batch = buildRecordBatch(root, 10); - - // Asynchronous append. - writer.append(new ArrowData(arrowSchema, batch)); + try (BufferAllocator allocator = new RootAllocator()) { + // A writer should be used to ingest as much data as possible before teardown. + // Append 100 batches. + for (int i = 0; i < 100; i++) { + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + // Each batch has 10 rows. + ArrowRecordBatch batch = buildRecordBatch(root, 10); + + // Asynchronous append. + writer.append(new ArrowData(arrowSchema, batch)); + } } } // Final cleanup for the stream during worker teardown. From 7fa9f77813aa26c646a8545d780a9dc4ed09ee3e Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 6 Jan 2026 16:25:19 -0500 Subject: [PATCH 10/14] chore: Use long for expectedRowCount --- .../com/example/bigquerystorage/ExportOpenTelemetry.java | 8 ++++---- .../com/example/bigquerystorage/WriteToDefaultStream.java | 8 ++++---- .../WriteToDefaultStreamTimestampJson.java | 8 ++++---- .../bigquerystorage/WriteToDefaultStreamWithArrow.java | 7 +++---- 4 files changed, 15 insertions(+), 16 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java index 43074b4805..ea06c76e17 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java @@ -105,11 +105,11 @@ public static void exportToOpenTelemetry(String projectId, String datasetName, S // Final cleanup for the stream during worker teardown. writer.cleanup(); - verifyExpectedRowCount(parentTable, 12); + verifyExpectedRowCount(parentTable, 12L); System.out.println("Appended records successfully."); } - private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount) + private static void verifyExpectedRowCount(TableName parentTable, long expectedRowCount) throws InterruptedException { String queryRowCount = "SELECT COUNT(*) FROM `" @@ -122,8 +122,8 @@ private static void verifyExpectedRowCount(TableName parentTable, int expectedRo QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); TableResult results = bigquery.query(queryConfig); - int countRowsActual = - Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + long countRowsActual = + Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue()); if (countRowsActual != expectedRowCount) { throw new RuntimeException( "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 828f81d873..fed1493587 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -106,11 +106,11 @@ public static void writeToDefaultStream(String projectId, String datasetName, St // Final cleanup for the stream during worker teardown. writer.cleanup(); - verifyExpectedRowCount(parentTable, 12); + verifyExpectedRowCount(parentTable, 12L); System.out.println("Appended records successfully."); } - private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount) + private static void verifyExpectedRowCount(TableName parentTable, long expectedRowCount) throws InterruptedException { String queryRowCount = "SELECT COUNT(*) FROM `" @@ -123,8 +123,8 @@ private static void verifyExpectedRowCount(TableName parentTable, int expectedRo QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); TableResult results = bigquery.query(queryConfig); - int countRowsActual = - Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + long countRowsActual = + Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue()); if (countRowsActual != expectedRowCount) { throw new RuntimeException( "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java index 2cdbdd7b60..9bcb32d764 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java @@ -88,11 +88,11 @@ public static void writeToDefaultStream(String projectId, String datasetName, St // Final cleanup for the stream during worker teardown. writer.cleanup(); - verifyExpectedRowCount(parentTable, 20); + verifyExpectedRowCount(parentTable, 20L); System.out.println("Appended records successfully."); } - private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount) + private static void verifyExpectedRowCount(TableName parentTable, long expectedRowCount) throws InterruptedException { String queryRowCount = "SELECT COUNT(*) FROM `" @@ -105,8 +105,8 @@ private static void verifyExpectedRowCount(TableName parentTable, int expectedRo QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); TableResult results = bigquery.query(queryConfig); - int countRowsActual = - Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + long countRowsActual = + Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue()); if (countRowsActual != expectedRowCount) { throw new RuntimeException( "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java index d29577c1a4..e1aa2ccb8f 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java @@ -180,8 +180,8 @@ private static void verifyExpectedRowCount(TableName parentTable, long expectedR BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(parentTable.getProject()).build().getService(); TableResult results = bigquery.query(queryConfig); - int countRowsActual = - Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + long countRowsActual = + Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue()); if (countRowsActual != expectedRowCount) { throw new RuntimeException( "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); @@ -216,8 +216,7 @@ private static class DataWriter { private final AtomicInteger recreateCount = new AtomicInteger(0); - private StreamWriter createStreamWriter(String streamName, Schema arrowSchema) - throws DescriptorValidationException, IOException, InterruptedException { + private StreamWriter createStreamWriter(String streamName, Schema arrowSchema) throws IOException { // Configure in-stream automatic retry settings. // Error codes that are immediately retried: // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED From 4d04a7ccb25f150cdda2baabb8ba1811651bc35f Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 6 Jan 2026 16:26:55 -0500 Subject: [PATCH 11/14] chore: Fix checkstyle issue --- .../example/bigquerystorage/WriteToDefaultStreamWithArrow.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java index e1aa2ccb8f..5db06efb04 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java @@ -216,7 +216,8 @@ private static class DataWriter { private final AtomicInteger recreateCount = new AtomicInteger(0); - private StreamWriter createStreamWriter(String streamName, Schema arrowSchema) throws IOException { + private StreamWriter createStreamWriter(String streamName, Schema arrowSchema) + throws IOException { // Configure in-stream automatic retry settings. // Error codes that are immediately retried: // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED From 6f2b3c8e99a16aaf53e4433887dbaa1d7cf3d777 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Wed, 7 Jan 2026 16:30:04 -0500 Subject: [PATCH 12/14] chore: Run tests in same thread --- .../cloud/bigquery/storage/v1/BigQueryReadClientTest.java | 3 +++ .../google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java | 3 +++ .../google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java | 3 +++ .../com/google/cloud/bigquery/storage/v1/StreamWriterTest.java | 3 +++ .../bigquery/storage/v1beta1/BigQueryStorageClientTest.java | 3 +++ .../src/test/resources/junit-platform.properties | 2 ++ 6 files changed, 17 insertions(+) create mode 100644 google-cloud-bigquerystorage/src/test/resources/junit-platform.properties diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java index 250c2bef28..f049717a43 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java @@ -52,7 +52,10 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +@Execution(ExecutionMode.SAME_THREAD) class BigQueryReadClientTest { private static MockBigQueryRead mockBigQueryRead; private static MockServiceHelper serviceHelper; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 590385d765..25b4a3f7e1 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -64,7 +64,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +@Execution(ExecutionMode.SAME_THREAD) class ConnectionWorkerTest { private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/s1"; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 62082165b3..bf8202201f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -66,7 +66,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +@Execution(ExecutionMode.SAME_THREAD) class JsonStreamWriterTest { private static final int NUMERIC_SCALE = 9; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index e6bc3d3bb1..614ec75333 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -95,7 +95,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +@Execution(ExecutionMode.SAME_THREAD) class StreamWriterTest { private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName()); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java index 2a1efb4cd4..417cbed9fc 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java @@ -63,7 +63,10 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +@Execution(ExecutionMode.SAME_THREAD) class BigQueryStorageClientTest { private static MockBigQueryStorage mockBigQueryStorage; private static MockServiceHelper serviceHelper; diff --git a/google-cloud-bigquerystorage/src/test/resources/junit-platform.properties b/google-cloud-bigquerystorage/src/test/resources/junit-platform.properties new file mode 100644 index 0000000000..82de190bf7 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/resources/junit-platform.properties @@ -0,0 +1,2 @@ +junit.jupiter.execution.parallel.enabled = true +junit.jupiter.execution.parallel.mode.default = concurrent \ No newline at end of file From 8e234236c5666a29bb5bcba9608974a7c225d9ed Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 12 Jan 2026 14:56:27 -0500 Subject: [PATCH 13/14] chore: Run certain tests single threaded --- .../bigquery/storage/v1/ConnectionWorkerPoolTest.java | 3 +++ .../cloud/bigquery/storage/v1/RequestProfilerTest.java | 9 +++++++++ .../bigquery/storage/v1/stub/ResourceHeaderTest.java | 2 +- .../storage/v1beta1/stub/ResourceHeaderTest.java | 2 +- .../bigquery/storage/v1beta2/BigQueryReadClientTest.java | 3 +++ .../storage/v1beta2/stub/ResourceHeaderTest.java | 2 +- 6 files changed, 18 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index e7cbea060d..51fea1232b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -47,7 +47,10 @@ import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +@Execution(ExecutionMode.SAME_THREAD) class ConnectionWorkerPoolTest { private FakeBigQueryWrite testBigQueryWrite; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java index e54ccda6ca..33460c190e 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java @@ -31,7 +31,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +@Execution(ExecutionMode.SAME_THREAD) class RequestProfilerTest { private static final Logger log = Logger.getLogger(RequestProfiler.class.getName()); @@ -164,6 +167,9 @@ void concurrentProfilingTest_1000ReqsRunTogether() throws Exception { assertTrue(reportText.contains("Request uuid: request_30 with total time")); assertTrue(reportText.contains("Request uuid: request_25 with total time")); assertTrue(reportText.contains("Request uuid: request_20 with total time")); + + threadPool.shutdown(); + threadPool.awaitTermination(10, TimeUnit.SECONDS); } @Test @@ -208,5 +214,8 @@ void concurrentProfilingTest_RunWhileFlushing() throws Exception { } String reportText = profilerHook.flushAndGenerateReportText(); assertTrue(reportText.contains("0 requests finished during")); + + threadPool.shutdown(); + threadPool.awaitTermination(10, TimeUnit.SECONDS); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java index 7facd86eb8..90f0c395f3 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java @@ -43,7 +43,7 @@ public class ResourceHeaderTest { private static final String TEST_STREAM_NAME = "streamName"; - private static final String NAME = "resource-header-test:123"; + private static final String NAME = "resource-header-test:123-v1"; private static final String HEADER_NAME = "x-goog-request-params"; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/ResourceHeaderTest.java index 9a571d90b9..b319042b59 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/ResourceHeaderTest.java @@ -51,7 +51,7 @@ public class ResourceHeaderTest { private static final Stream TEST_STREAM = Stream.newBuilder().setName("streamName").build(); - private static final String NAME = "resource-header-test:123"; + private static final String NAME = "resource-header-test:123-v1beta1"; private static final String HEADER_NAME = "x-goog-request-params"; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java index 262616fb39..b274569b95 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java @@ -52,7 +52,10 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +@Execution(ExecutionMode.SAME_THREAD) class BigQueryReadClientTest { private static MockBigQueryRead mockBigQueryRead; private static MockServiceHelper serviceHelper; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java index d749018ea2..5b6bf390f8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java @@ -48,7 +48,7 @@ public class ResourceHeaderTest { private static final String TEST_STREAM_NAME = "streamName"; - private static final String NAME = "resource-header-test:123"; + private static final String NAME = "resource-header-test:123-v1beta2"; private static final String HEADER_NAME = "x-goog-request-params"; From c5f33358180320b0ed91f6cf306e3957a2a6e0b5 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 12 Jan 2026 16:05:34 -0500 Subject: [PATCH 14/14] chore: Add another test to run sequentially --- .../storage/v1/it/ITBigQueryStorageWriteClientTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java index 285e9aadac..b7e46be405 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java @@ -95,8 +95,11 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; /** Integration tests for BigQuery Write API. */ +@Execution(ExecutionMode.SAME_THREAD) class ITBigQueryStorageWriteClientTest { private static final Logger LOG = Logger.getLogger(ITBigQueryStorageWriteClientTest.class.getName());