diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptor.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptor.java index 60bb739b23..5842f6d068 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptor.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptor.java @@ -30,6 +30,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.logging.Logger; /** * Converts a BQ table schema to protobuf descriptor. All field names will be converted to lowercase @@ -37,15 +39,18 @@ * shown in the ImmutableMaps below. */ public class BQTableSchemaToProtoDescriptor { - private static ImmutableMap - BQTableSchemaModeMap = - ImmutableMap.of( - TableFieldSchema.Mode.NULLABLE, FieldDescriptorProto.Label.LABEL_OPTIONAL, - TableFieldSchema.Mode.REPEATED, FieldDescriptorProto.Label.LABEL_REPEATED, - TableFieldSchema.Mode.REQUIRED, FieldDescriptorProto.Label.LABEL_REQUIRED); - private static ImmutableMap - BQTableSchemaTypeMap = + private static final Logger LOG = + Logger.getLogger(BQTableSchemaToProtoDescriptor.class.getName()); + + private static Map DEFAULT_BQ_TABLE_SCHEMA_MODE_MAP = + ImmutableMap.of( + TableFieldSchema.Mode.NULLABLE, FieldDescriptorProto.Label.LABEL_OPTIONAL, + TableFieldSchema.Mode.REPEATED, FieldDescriptorProto.Label.LABEL_REPEATED, + TableFieldSchema.Mode.REQUIRED, FieldDescriptorProto.Label.LABEL_REQUIRED); + + private static Map + DEFAULT_BQ_TABLE_SCHEMA_TYPE_MAP = new ImmutableMap.Builder() .put(TableFieldSchema.Type.BOOL, FieldDescriptorProto.Type.TYPE_BOOL) .put(TableFieldSchema.Type.BYTES, FieldDescriptorProto.Type.TYPE_BYTES) @@ -142,11 +147,13 @@ private static Descriptor convertBQTableSchemaToProtoDescriptorImpl( .setType(BQTableField.getRangeElementType().getType()) .setName("start") .setMode(Mode.NULLABLE) + .setTimestampPrecision(BQTableField.getTimestampPrecision()) .build(), TableFieldSchema.newBuilder() .setType(BQTableField.getRangeElementType().getType()) .setName("end") .setMode(Mode.NULLABLE) + .setTimestampPrecision(BQTableField.getTimestampPrecision()) .build()); if (dependencyMap.containsKey(rangeFields)) { @@ -189,7 +196,7 @@ private static Descriptor convertBQTableSchemaToProtoDescriptorImpl( * @param index Index for protobuf fields. * @param scope used to name descriptors */ - private static FieldDescriptorProto convertBQTableFieldToProtoField( + static FieldDescriptorProto convertBQTableFieldToProtoField( TableFieldSchema BQTableField, int index, String scope) { TableFieldSchema.Mode mode = BQTableField.getMode(); String fieldName = BQTableField.getName().toLowerCase(); @@ -198,7 +205,7 @@ private static FieldDescriptorProto convertBQTableFieldToProtoField( FieldDescriptorProto.newBuilder() .setName(fieldName) .setNumber(index) - .setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode)); + .setLabel((FieldDescriptorProto.Label) DEFAULT_BQ_TABLE_SCHEMA_MODE_MAP.get(mode)); switch (BQTableField.getType()) { case STRUCT: @@ -206,12 +213,37 @@ private static FieldDescriptorProto convertBQTableFieldToProtoField( break; case RANGE: fieldDescriptor.setType( - (FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType())); + (FieldDescriptorProto.Type) + DEFAULT_BQ_TABLE_SCHEMA_TYPE_MAP.get(BQTableField.getType())); fieldDescriptor.setTypeName(scope); break; + case TIMESTAMP: + // Can map to either int64 or string based on the BQ Field's timestamp precision + // Default: microsecond (6) maps to int64 and picosecond (12) maps to string. + long timestampPrecision = BQTableField.getTimestampPrecision().getValue(); + if (timestampPrecision == 12L) { + fieldDescriptor.setType( + (FieldDescriptorProto.Type) FieldDescriptorProto.Type.TYPE_STRING); + break; + } + // This should never happen as this is a server response issue. If this is the case, + // warn the user and use INT64 as the default is microsecond precision. + if (timestampPrecision != 6L && timestampPrecision != 0L) { + LOG.warning( + "BigQuery Timestamp field " + + BQTableField.getName() + + " has timestamp precision that is not 6 or 12. Defaulting to microsecond" + + " precision and mapping to INT64 protobuf type."); + } + // If the timestampPrecision value comes back as a null result from the server, + // timestampPrecision has a value of 0L. Use the INT64 to map to the type used + // for the default precision (microsecond). + fieldDescriptor.setType((FieldDescriptorProto.Type) FieldDescriptorProto.Type.TYPE_INT64); + break; default: fieldDescriptor.setType( - (FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType())); + (FieldDescriptorProto.Type) + DEFAULT_BQ_TABLE_SCHEMA_TYPE_MAP.get(BQTableField.getType())); break; } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java index 9a4fecf780..6e5643f002 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java @@ -15,8 +15,14 @@ */ package com.google.cloud.bigquery.storage.v1; +import static java.time.temporal.ChronoField.HOUR_OF_DAY; +import static java.time.temporal.ChronoField.MINUTE_OF_HOUR; +import static java.time.temporal.ChronoField.NANO_OF_SECOND; +import static java.time.temporal.ChronoField.SECOND_OF_MINUTE; + import com.google.api.pathtemplate.ValidationException; import com.google.cloud.bigquery.storage.v1.Exceptions.RowIndexToErrorException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.primitives.Doubles; @@ -26,15 +32,18 @@ import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Timestamp; import com.google.protobuf.UninitializedMessageException; import java.math.BigDecimal; import java.math.RoundingMode; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; +import java.time.format.DateTimeParseException; import java.time.format.TextStyle; import java.time.temporal.ChronoField; import java.time.temporal.TemporalAccessor; @@ -42,6 +51,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -63,7 +74,31 @@ public class JsonToProtoMessage implements ToProtoConverter { .put(FieldDescriptor.Type.STRING, "string") .put(FieldDescriptor.Type.MESSAGE, "object") .build(); - private static final DateTimeFormatter TIMESTAMP_FORMATTER = + + private static final DateTimeFormatter TO_TIMESTAMP_FORMATTER = + new DateTimeFormatterBuilder() + .parseLenient() + .append(DateTimeFormatter.ISO_LOCAL_DATE) + .optionalStart() + .appendLiteral('T') + .optionalEnd() + .appendValue(HOUR_OF_DAY, 2) + .appendLiteral(':') + .appendValue(MINUTE_OF_HOUR, 2) + .optionalStart() + .appendLiteral(':') + .appendValue(SECOND_OF_MINUTE, 2) + .optionalEnd() + .optionalStart() + .appendFraction(NANO_OF_SECOND, 6, 9, true) + .optionalEnd() + .optionalStart() + .appendOffset("+HHMM", "+00:00") + .optionalEnd() + .toFormatter() + .withZone(ZoneOffset.UTC); + + private static final DateTimeFormatter FROM_TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder() .parseLenient() .append(DateTimeFormatter.ofPattern("yyyy[/][-]MM[/][-]dd")) @@ -120,6 +155,14 @@ public class JsonToProtoMessage implements ToProtoConverter { .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) .toFormatter(); + // Regex to identify >9 digits in the fraction part (e.g. `.123456789123`) + // Matches the dot, followed by 10+ digits (fractional part), followed by non-digits (like `+00`) + // or end of string + private static final Pattern ISO8601_TIMESTAMP_HIGH_PRECISION_PATTERN = + Pattern.compile("\\.(\\d{10,})(?:\\D|$)"); + private static final long MICROS_PER_SECOND = 1_000_000; + private static final int NANOS_PER_MICRO = 1_000; + /** You can use {@link #INSTANCE} instead */ public JsonToProtoMessage() {} @@ -620,25 +663,8 @@ private void fillField( return; } } else if (fieldSchema.getType() == TableFieldSchema.Type.TIMESTAMP) { - if (val instanceof String) { - Double parsed = Doubles.tryParse((String) val); - if (parsed != null) { - protoMsg.setField(fieldDescriptor, parsed.longValue()); - return; - } - TemporalAccessor parsedTime = TIMESTAMP_FORMATTER.parse((String) val); - protoMsg.setField( - fieldDescriptor, - parsedTime.getLong(ChronoField.INSTANT_SECONDS) * 1000000 - + parsedTime.getLong(ChronoField.MICRO_OF_SECOND)); - return; - } else if (val instanceof Long) { - protoMsg.setField(fieldDescriptor, val); - return; - } else if (val instanceof Integer) { - protoMsg.setField(fieldDescriptor, Long.valueOf((Integer) val)); - return; - } + protoMsg.setField(fieldDescriptor, getTimestampAsLong(val)); + return; } } if (val instanceof Integer) { @@ -685,6 +711,14 @@ private void fillField( } break; case STRING: + // Timestamp fields will be transmitted as a String if BQ's timestamp field is + // enabled to support picosecond. Check that the schema's field is timestamp before + // proceeding with the rest of the logic. Converts the supported types into a String. + // Supported types: https://docs.cloud.google.com/bigquery/docs/supported-data-types + if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.TIMESTAMP) { + protoMsg.setField(fieldDescriptor, getTimestampAsString(val)); + return; + } if (val instanceof String) { protoMsg.setField(fieldDescriptor, val); return; @@ -897,24 +931,7 @@ private void fillRepeatedField( } } else if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.TIMESTAMP) { - if (val instanceof String) { - Double parsed = Doubles.tryParse((String) val); - if (parsed != null) { - protoMsg.addRepeatedField(fieldDescriptor, parsed.longValue()); - } else { - TemporalAccessor parsedTime = TIMESTAMP_FORMATTER.parse((String) val); - protoMsg.addRepeatedField( - fieldDescriptor, - parsedTime.getLong(ChronoField.INSTANT_SECONDS) * 1000000 - + parsedTime.getLong(ChronoField.MICRO_OF_SECOND)); - } - } else if (val instanceof Long) { - protoMsg.addRepeatedField(fieldDescriptor, val); - } else if (val instanceof Integer) { - protoMsg.addRepeatedField(fieldDescriptor, Long.valueOf((Integer) val)); - } else { - throwWrongFieldType(fieldDescriptor, currentScope, index); - } + protoMsg.addRepeatedField(fieldDescriptor, getTimestampAsLong(val)); } else if (val instanceof Integer) { protoMsg.addRepeatedField(fieldDescriptor, Long.valueOf((Integer) val)); } else if (val instanceof Long) { @@ -958,6 +975,14 @@ private void fillRepeatedField( } break; case STRING: + // Timestamp fields will be transmitted as a String if BQ's timestamp field is + // enabled to support picosecond. Check that the schema's field is timestamp before + // proceeding with the rest of the logic. Converts the supported types into a String. + // Supported types: https://docs.cloud.google.com/bigquery/docs/supported-data-types + if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.TIMESTAMP) { + protoMsg.addRepeatedField(fieldDescriptor, getTimestampAsString(val)); + return; + } if (val instanceof String) { protoMsg.addRepeatedField(fieldDescriptor, val); } else if (val instanceof Short @@ -1002,6 +1027,76 @@ private void fillRepeatedField( } } + /** + * Converts microseconds from epoch to a Java Instant. + * + * @param micros the number of microseconds from 1970-01-01T00:00:00Z + * @return the Instant corresponding to the microseconds + */ + @VisibleForTesting + static Instant fromEpochMicros(long micros) { + long seconds = Math.floorDiv(micros, MICROS_PER_SECOND); + int nanos = (int) Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICRO; + + return Instant.ofEpochSecond(seconds, nanos); + } + + /** + * Best effort to try and convert a timestamp to an ISO8601 string. Standardize the timestamp + * output to be ISO_DATE_TIME (e.g. 2011-12-03T10:15:30+01:00) for timestamps up to nanosecond + * precision. For higher precision, the ISO8601 input is used as long as it is valid. + */ + @VisibleForTesting + static String getTimestampAsString(Object val) { + if (val instanceof String) { + String value = (String) val; + Double parsed = Doubles.tryParse(value); + // If true, it was a numeric value inside a String + if (parsed != null) { + return getTimestampAsString(parsed.longValue()); + } + // Validate the ISO8601 values before sending it to the server. + validateTimestamp(value); + + // If it's high precision (more than 9 digits), then return the ISO8601 string as-is + // as JDK does not have a DateTimeFormatter that supports more than nanosecond precision. + Matcher matcher = ISO8601_TIMESTAMP_HIGH_PRECISION_PATTERN.matcher(value); + if (matcher.find()) { + return value; + } + // Otherwise, output the timestamp to a standard format before sending it to BQ + Instant instant = FROM_TIMESTAMP_FORMATTER.parse(value, Instant::from); + return TO_TIMESTAMP_FORMATTER.format(instant); + } else if (val instanceof Number) { + // Micros from epoch will most likely will be represented a Long, but any numeric + // value can be used + Instant instant = fromEpochMicros(((Number) val).longValue()); + return TO_TIMESTAMP_FORMATTER.format(instant); + } else if (val instanceof Timestamp) { + // Convert the Protobuf timestamp class to ISO8601 string + Timestamp timestamp = (Timestamp) val; + return TO_TIMESTAMP_FORMATTER.format( + Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos())); + } + throw new IllegalArgumentException("The timestamp value passed in is not from a valid type"); + } + + /* Best effort to try and convert the Object to a long (microseconds since epoch) */ + private long getTimestampAsLong(Object val) { + if (val instanceof String) { + Double parsed = Doubles.tryParse((String) val); + if (parsed != null) { + return parsed.longValue(); + } + TemporalAccessor parsedTime = FROM_TIMESTAMP_FORMATTER.parse((String) val); + return parsedTime.getLong(ChronoField.INSTANT_SECONDS) * 1000000 + + parsedTime.getLong(ChronoField.MICRO_OF_SECOND); + } else if (val instanceof Number) { + return ((Number) val).longValue(); + } + throw new IllegalArgumentException("The timestamp value passed in is not from a valid type"); + } + private static void throwWrongFieldType( FieldDescriptor fieldDescriptor, String currentScope, int index) { throw new IllegalArgumentException( @@ -1009,4 +1104,43 @@ private static void throwWrongFieldType( "JSONObject does not have a %s field at %s[%d].", FIELD_TYPE_TO_DEBUG_MESSAGE.get(fieldDescriptor.getType()), currentScope, index)); } + + /** + * Internal helper method to check that the timestamp follows the expected String input of ISO8601 + * string. Allows the fractional portion of the timestamp to support up to 12 digits of precision + * (up to picosecond). + * + * @throws IllegalArgumentException if timestamp is invalid or exceeds picosecond precision + */ + @VisibleForTesting + static void validateTimestamp(String timestamp) { + // Check if the string has greater than nanosecond precision (>9 digits in fractional second) + Matcher matcher = ISO8601_TIMESTAMP_HIGH_PRECISION_PATTERN.matcher(timestamp); + if (matcher.find()) { + // Group 1 is the fractional second part of the ISO8601 string + String fraction = matcher.group(1); + // Pos 10-12 of the fractional second are guaranteed to be digits. The regex only + // matches the fraction section as long as they are digits. + if (fraction.length() > 12) { + throw new IllegalArgumentException( + "Fractional second portion of ISO8601 only supports up to picosecond (12 digits) in" + + " BigQuery"); + } + + // Replace the entire fractional second portion with just the nanosecond portion. + // The new timestamp will be validated against the JDK's DateTimeFormatter + String truncatedFraction = fraction.substring(0, 9); + timestamp = + new StringBuilder(timestamp) + .replace(matcher.start(1), matcher.end(1), truncatedFraction) + .toString(); + } + + // It is valid as long as DateTimeFormatter doesn't throw an exception + try { + FROM_TIMESTAMP_FORMATTER.parse((String) timestamp); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException(e.getMessage(), e); + } + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptorTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptorTest.java index ba845c1c12..51b78df183 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptorTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptorTest.java @@ -15,13 +15,18 @@ */ package com.google.cloud.bigquery.storage.v1; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; -import com.google.cloud.bigquery.storage.test.JsonTest.*; -import com.google.cloud.bigquery.storage.test.SchemaTest.*; +import com.google.cloud.bigquery.storage.test.JsonTest; +import com.google.cloud.bigquery.storage.test.SchemaTest; import com.google.common.collect.ImmutableMap; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Int64Value; import java.util.HashMap; import java.util.Map; import org.junit.Test; @@ -32,21 +37,20 @@ public class BQTableSchemaToProtoDescriptorTest { // This is a map between the TableFieldSchema.Type and the descriptor it is supposed to // produce. The produced descriptor will be used to check against the entry values here. - private static ImmutableMap - BQTableTypeToCorrectProtoDescriptorTest = - new ImmutableMap.Builder() - .put(TableFieldSchema.Type.BOOL, BoolType.getDescriptor()) - .put(TableFieldSchema.Type.BYTES, BytesType.getDescriptor()) - .put(TableFieldSchema.Type.DATE, Int32Type.getDescriptor()) - .put(TableFieldSchema.Type.DATETIME, Int64Type.getDescriptor()) - .put(TableFieldSchema.Type.DOUBLE, DoubleType.getDescriptor()) - .put(TableFieldSchema.Type.GEOGRAPHY, StringType.getDescriptor()) - .put(TableFieldSchema.Type.INT64, Int64Type.getDescriptor()) - .put(TableFieldSchema.Type.NUMERIC, BytesType.getDescriptor()) - .put(TableFieldSchema.Type.STRING, StringType.getDescriptor()) - .put(TableFieldSchema.Type.TIME, Int64Type.getDescriptor()) - .put(TableFieldSchema.Type.TIMESTAMP, Int64Type.getDescriptor()) - .build(); + private static Map BQTableTypeToCorrectProtoDescriptorTest = + new ImmutableMap.Builder() + .put(TableFieldSchema.Type.BOOL, SchemaTest.BoolType.getDescriptor()) + .put(TableFieldSchema.Type.BYTES, SchemaTest.BytesType.getDescriptor()) + .put(TableFieldSchema.Type.DATE, SchemaTest.Int32Type.getDescriptor()) + .put(TableFieldSchema.Type.DATETIME, SchemaTest.Int64Type.getDescriptor()) + .put(TableFieldSchema.Type.DOUBLE, SchemaTest.DoubleType.getDescriptor()) + .put(TableFieldSchema.Type.GEOGRAPHY, SchemaTest.StringType.getDescriptor()) + .put(TableFieldSchema.Type.INT64, SchemaTest.Int64Type.getDescriptor()) + .put(TableFieldSchema.Type.NUMERIC, SchemaTest.BytesType.getDescriptor()) + .put(TableFieldSchema.Type.STRING, SchemaTest.StringType.getDescriptor()) + .put(TableFieldSchema.Type.TIME, SchemaTest.Int64Type.getDescriptor()) + .put(TableFieldSchema.Type.TIMESTAMP, SchemaTest.Int64Type.getDescriptor()) + .build(); // Creates mapping from descriptor to how many times it was reused. private void mapDescriptorToCount(Descriptor descriptor, HashMap map) { @@ -64,25 +68,28 @@ private void mapDescriptorToCount(Descriptor descriptor, HashMap JsonToProtoMessage.getTimestampAsString("2025-10-01")); + assertThrows( + IllegalArgumentException.class, () -> JsonToProtoMessage.getTimestampAsString("abc")); + assertThrows( + IllegalArgumentException.class, + () -> JsonToProtoMessage.getTimestampAsString(Timestamp.newBuilder())); + assertThrows( + IllegalArgumentException.class, + () -> JsonToProtoMessage.getTimestampAsString(new Object())); + assertThrows( + IllegalArgumentException.class, () -> JsonToProtoMessage.getTimestampAsString(null)); + } + + @Test + public void testFromEpochMicros() { + // The `+` is added if there are more than 4 digits for years + assertEquals( + "+294247-01-10T04:00:54.775807Z", + JsonToProtoMessage.fromEpochMicros(Long.MAX_VALUE).toString()); + assertEquals( + "-290308-12-21T19:59:05.224192Z", + JsonToProtoMessage.fromEpochMicros(Long.MIN_VALUE).toString()); + assertEquals(Instant.EPOCH.toString(), JsonToProtoMessage.fromEpochMicros(0L).toString()); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BQTableSchemaToProtoDescriptorTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BQTableSchemaToProtoDescriptorTest.java index 8e08418237..06faf91959 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BQTableSchemaToProtoDescriptorTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BQTableSchemaToProtoDescriptorTest.java @@ -65,9 +65,9 @@ private void mapDescriptorToCount(Descriptor descriptor, HashMap