diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java index 58babb50b..e28d8fcc7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java @@ -51,11 +51,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; +import com.google.protobuf.DescriptorProtos; import com.google.protobuf.DescriptorProtos.DescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.google.protobuf.DynamicMessage; import com.google.protobuf.Int64Value; +import com.google.protobuf.Message; import io.grpc.Status; import io.grpc.Status.Code; import java.io.ByteArrayOutputStream; @@ -124,7 +127,7 @@ public class ITBigQueryStorageWriteClientTest { // Arrow is a bit special in that timestamps are limited to nanoseconds precision. // The data will be padded to fit into the higher precision columns. - public static final Object[][] INPUT_ARROW_WRITE_TIMESTAMPS = + private static final Object[][] INPUT_ARROW_WRITE_TIMESTAMPS = new Object[][] { {1735734896123456L /* 2025-01-01T12:34:56.123456Z */, 1735734896123456789L}, {1580646896123456L /* 2020-02-02T12:34:56.123456Z */, 1580646896123456789L}, @@ -134,7 +137,7 @@ public class ITBigQueryStorageWriteClientTest { // Arrow's higher precision column is padded with extra 0's if configured to return // ISO as output for any picosecond enabled column. - public static final Object[][] EXPECTED_ARROW_WRITE_TIMESTAMPS_ISO_OUTPUT = + private static final Object[][] EXPECTED_ARROW_WRITE_TIMESTAMPS_ISO_OUTPUT = new Object[][] { {1735734896123456L /* 2025-01-01T12:34:56.123456Z */, "2025-01-01T12:34:56.123456789000Z"}, {1580646896123456L /* 2020-02-02T12:34:56.123456Z */, "2020-02-02T12:34:56.123456789000Z"}, @@ -142,6 +145,27 @@ public class ITBigQueryStorageWriteClientTest { {165846896123456L /* 1975-04-04T12:34:56.123456Z */, "1975-04-04T12:34:56.123456789000Z"} }; + // Special case where users can use the Write API with Protobuf messages + // The format is two fields: 1. Seconds from epoch and 2. Subsecond fractional (millis, micros, + // nano, or pico). This test case is using picos sub-second fractional + private static final Long[][] INPUT_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS = + new Long[][] { + {1735734896L, 123456789123L}, /* 2025-01-01T12:34:56.123456789123Z */ + {1580646896L, 123456789123L}, /* 2020-02-02T12:34:56.123456789123Z */ + {636467696L, 123456789123L}, /* 1990-03-03T12:34:56.123456789123Z */ + {165846896L, 123456789123L} /* 1975-04-04T12:34:56.123456789123Z */ + }; + + // Expected ISO8601 output when using proto descriptors to write to BQ with pico precision + private static final String[] + EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT = + new String[] { + "2025-01-01T12:34:56.123456789123Z", + "2020-02-02T12:34:56.123456789123Z", + "1990-03-03T12:34:56.123456789123Z", + "1975-04-04T12:34:56.123456789123Z" + }; + public static class StringWithSecondsNanos { public String foo; public long seconds; @@ -2368,7 +2392,7 @@ public void timestamp_arrowWrite() throws IOException { @Test public void timestamp_protobufWrite() throws IOException, DescriptorValidationException, InterruptedException { - String tableName = "bqstorage_timestamp_write_protobuf"; + String tableName = "bqstorage_timestamp_write_protobuf_schema_aware"; // Opt to create a new table to write to instead of re-using table to prevent // the test from failing due to any issues with deleting data after test. // Increases the test time duration, but would be more resilient to transient @@ -2417,6 +2441,130 @@ public void timestamp_protobufWrite() assertTimestamps(tableName, EXPECTED_TIMESTAMPS_HIGHER_PRECISION_ISO_OUTPUT); } + // Tests that users can use a Protobuf message that contains second a fractional + // part (pico) to be written to BQ + @Test + public void timestamp_protobufWrite_customMessage_higherPrecision() + throws IOException, DescriptorValidationException { + String tableName = "bqstorage_timestamp_write_protobuf_custom_descriptor"; + // Opt to create a new table to write to instead of re-using table to prevent + // the test from failing due to any issues with deleting data after test. + // Increases the test time duration, but would be more resilient to transient + // failures + createTimestampTable(tableName); + + /* + A sample protobuf format: + message Wrapper { + message TimestampPicos { + int64 seconds = 1; + int64 picoseconds = 2; + } + Wrapper timestampHigherPrecision = 1; + // ... + } + */ + String wrapperProtoName = "Wrapper"; + String timestampPicosProtoName = "TimestampPicos"; + String secondsProtoName = "seconds"; + String picosProtoName = "picoseconds"; + DescriptorProto timestampPicosDescriptor = + DescriptorProto.newBuilder() + .setName(timestampPicosProtoName) + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName(secondsProtoName) + .setNumber(1) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64) + .build()) + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName(picosProtoName) + .setNumber(2) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64) + .build()) + .build(); + DescriptorProto wrapperDescriptor = + DescriptorProto.newBuilder() + .setName(wrapperProtoName) // random name + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME) + .setNumber(3) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE) + .setTypeName(timestampPicosDescriptor.getName()) + .build()) + .addNestedType(timestampPicosDescriptor) + .build(); + ProtoSchema protoSchema = + ProtoSchema.newBuilder().setProtoDescriptor(wrapperDescriptor).build(); + + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + try (StreamWriter streamWriter = + StreamWriter.newBuilder(parent.toString() + "/_default", writeClient) + .setWriterSchema(protoSchema) + .build()) { + DescriptorProtos.FileDescriptorProto fileProto = + DescriptorProtos.FileDescriptorProto.newBuilder() + .setName("test.proto") // dummy proto file + .addMessageType(wrapperDescriptor) + .build(); + + // Build the runtime descriptor (resolves types and names) + Descriptors.FileDescriptor file = + Descriptors.FileDescriptor.buildFrom(fileProto, new Descriptors.FileDescriptor[] {}); + + // Get the handle to the "wrapper" message type + Descriptors.Descriptor descriptor = file.findMessageTypeByName(wrapperProtoName); + + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); + for (Long[] timestampParts : INPUT_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS) { + Message message = + DynamicMessage.newBuilder(descriptor) + .setField( + descriptor.findFieldByName(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME), + DynamicMessage.newBuilder( + descriptor.findNestedTypeByName(timestampPicosProtoName)) + .setField( + descriptor + .findNestedTypeByName(timestampPicosProtoName) + .findFieldByName(secondsProtoName), + timestampParts[0]) + .setField( + descriptor + .findNestedTypeByName(timestampPicosProtoName) + .findFieldByName(picosProtoName), + timestampParts[1]) + .build()) + .build(); + rowsBuilder.addSerializedRows(message.toByteString()); + } + ApiFuture future = streamWriter.append(rowsBuilder.build()); + ApiFutures.addCallback( + future, new Helper.AppendCompleteCallback(), MoreExecutors.directExecutor()); + } + String table = + BigQueryResource.formatTableResource( + ServiceOptions.getDefaultProjectId(), DATASET, tableName); + + // Read all the data as Avro GenericRecords + List rows = Helper.readAllRows(readClient, parentProjectId, table, null); + List timestampHigherPrecision = + rows.stream() + .map(x -> x.get(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME).toString()) + .collect(Collectors.toList()); + assertEquals( + EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT.length, + timestampHigherPrecision.size()); + for (int i = 0; + i < EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT.length; + i++) { + assertEquals( + EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT[i], + timestampHigherPrecision.get(i)); + } + } + private void createTimestampTable(String tableName) { Schema bqTableSchema = Schema.of(