diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java
new file mode 100644
index 000000000000..d942340db5d2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * A date without a time-zone.
+ *
+ *
It cannot represent an instant on the time-line without additional information such as an
+ * offset or time-zone.
+ *
+ *
Its input type is a {@link LocalDate}, and base type is a {@link Long} that represents a
+ * incrementing count of days where day 0 is 1970-01-01 (ISO).
+ */
+public class Date implements Schema.LogicalType {
+
+ @Override
+ public String getIdentifier() {
+ return "beam:logical_type:date:v1";
+ }
+
+ // unused
+ @Override
+ public Schema.FieldType getArgumentType() {
+ return Schema.FieldType.STRING;
+ }
+
+ // unused
+ @Override
+ public String getArgument() {
+ return "";
+ }
+
+ @Override
+ public Schema.FieldType getBaseType() {
+ return Schema.FieldType.INT64;
+ }
+
+ @Override
+ public Long toBaseType(LocalDate input) {
+ return input == null ? null : input.toEpochDay();
+ }
+
+ @Override
+ public LocalDate toInputType(Long base) {
+ return base == null ? null : LocalDate.ofEpochDay(base);
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
new file mode 100644
index 000000000000..4dbab4825159
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+
+/** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType}s corresponding to SQL data types. */
+public class SqlTypes {
+
+ private SqlTypes() {}
+
+ /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL DATE type. */
+ public static final LogicalType DATE = new Date();
+}
diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
index 8e76f4cc7bb1..0ce9f335bc89 100644
--- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
+++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
@@ -28,6 +28,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Strings;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
@@ -38,7 +39,7 @@ class SchemaUtils {
ImmutableMap.builder()
.put("BOOL", FieldType.BOOLEAN)
.put("BYTES", FieldType.BYTES)
- .put("DATE", FieldType.logicalType(new CalciteUtils.DateType()))
+ .put("DATE", FieldType.logicalType(SqlTypes.DATE))
.put("DATETIME", FieldType.DATETIME)
.put("DOUBLE", FieldType.DOUBLE)
.put("FLOAT", FieldType.DOUBLE)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index b9821aa1f237..25392ff7cc5e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -20,13 +20,13 @@
import static org.apache.beam.sdk.schemas.Schema.FieldType;
import static org.apache.beam.sdk.schemas.Schema.TypeName;
import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.math.BigDecimal;
+import java.time.LocalDate;
import java.util.AbstractList;
import java.util.AbstractMap;
import java.util.Arrays;
@@ -39,11 +39,11 @@
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimestampWithLocalTzType;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -315,7 +315,7 @@ private static Expression castOutput(Expression value, FieldType toType) {
private static Expression castOutputTime(Expression value, FieldType toType) {
Expression valueDateTime = value;
- // First, convert to millis
+ // First, convert to millis (except for DATE type)
if (CalciteUtils.TIMESTAMP.typesEqual(toType)
|| CalciteUtils.NULLABLE_TIMESTAMP.typesEqual(toType)) {
if (value.getType() == java.sql.Timestamp.class) {
@@ -331,13 +331,16 @@ private static Expression castOutputTime(Expression value, FieldType toType) {
if (value.getType() == java.sql.Date.class) {
valueDateTime = Expressions.call(BuiltInMethod.DATE_TO_INT.method, valueDateTime);
}
- valueDateTime = Expressions.multiply(valueDateTime, Expressions.constant(MILLIS_PER_DAY));
} else {
throw new UnsupportedOperationException("Unknown DateTime type " + toType);
}
- // Second, convert to joda Instant
- valueDateTime = Expressions.new_(Instant.class, valueDateTime);
+ // Second, convert to joda Instant (or LocalDate for DATE type)
+ if (CalciteUtils.DATE.typesEqual(toType) || CalciteUtils.NULLABLE_DATE.typesEqual(toType)) {
+ valueDateTime = Expressions.call(LocalDate.class, "ofEpochDay", valueDateTime);
+ } else {
+ valueDateTime = Expressions.new_(Instant.class, valueDateTime);
+ }
// Third, make conversion conditional on non-null input.
if (!((Class) value.getType()).isPrimitive()) {
@@ -371,9 +374,9 @@ private static class InputGetterImpl implements RexToLixTranslator.InputGetter {
.put(TypeName.ROW, Row.class)
.build();
- private static final Map LOGICAL_TYPE_CONVERSION_MAP =
+ private static final Map LOGICAL_TYPE_TO_BASE_TYPE_MAP =
ImmutableMap.builder()
- .put(DateType.IDENTIFIER, ReadableInstant.class)
+ .put(SqlTypes.DATE.getIdentifier(), Long.class)
.put(TimeType.IDENTIFIER, ReadableInstant.class)
.put(TimeWithLocalTzType.IDENTIFIER, ReadableInstant.class)
.put(TimestampWithLocalTzType.IDENTIFIER, ReadableInstant.class)
@@ -406,7 +409,7 @@ private static Expression value(
if (storageType == Object.class) {
convertTo = Object.class;
} else if (fromType.getTypeName().isLogicalType()) {
- convertTo = LOGICAL_TYPE_CONVERSION_MAP.get(fromType.getLogicalType().getIdentifier());
+ convertTo = LOGICAL_TYPE_TO_BASE_TYPE_MAP.get(fromType.getLogicalType().getIdentifier());
} else {
convertTo = TYPE_CONVERSION_MAP.get(fromType.getTypeName());
}
@@ -427,18 +430,13 @@ private static Expression value(
private static Expression value(Expression value, Schema.FieldType type) {
if (type.getTypeName().isLogicalType()) {
- Expression millisField = Expressions.call(value, "getMillis");
String logicalId = type.getLogicalType().getIdentifier();
- if (logicalId.equals(TimeType.IDENTIFIER)) {
- return nullOr(value, Expressions.convert_(millisField, int.class));
- } else if (logicalId.equals(DateType.IDENTIFIER)) {
- value =
- nullOr(
- value,
- Expressions.convert_(
- Expressions.divide(millisField, Expressions.constant(MILLIS_PER_DAY)),
- int.class));
- } else if (!logicalId.equals(CharType.IDENTIFIER)) {
+ if (TimeType.IDENTIFIER.equals(logicalId)) {
+ return nullOr(
+ value, Expressions.convert_(Expressions.call(value, "getMillis"), int.class));
+ } else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) {
+ value = nullOr(value, value);
+ } else if (!CharType.IDENTIFIER.equals(logicalId)) {
throw new UnsupportedOperationException(
"Unknown LogicalType " + type.getLogicalType().getIdentifier());
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 11820a0df943..1a688df93330 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -18,9 +18,9 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY;
import java.io.IOException;
+import java.time.LocalDate;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -36,7 +36,6 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.Counter;
@@ -50,6 +49,7 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
@@ -303,11 +303,15 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {
switch (type.getTypeName()) {
case LOGICAL_TYPE:
String logicalId = type.getLogicalType().getIdentifier();
- if (logicalId.equals(TimeType.IDENTIFIER)) {
+ if (TimeType.IDENTIFIER.equals(logicalId)) {
return (int) ((ReadableInstant) beamValue).getMillis();
- } else if (logicalId.equals(DateType.IDENTIFIER)) {
- return (int) (((ReadableInstant) beamValue).getMillis() / MILLIS_PER_DAY);
- } else if (logicalId.equals(CharType.IDENTIFIER)) {
+ } else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) {
+ if (beamValue instanceof Long) { // base type
+ return ((Long) beamValue).intValue();
+ } else { // input type
+ return (int) (((LocalDate) beamValue).toEpochDay());
+ }
+ } else if (CharType.IDENTIFIER.equals(logicalId)) {
return beamValue;
} else {
throw new UnsupportedOperationException("Unknown DateTime type " + logicalId);
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
index f3c2704cdbc4..aa8767464ef7 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
@@ -22,7 +22,10 @@
import java.io.IOException;
import java.io.StringWriter;
import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
import java.util.ArrayList;
+import java.util.GregorianCalendar;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
@@ -114,8 +117,17 @@ public static Object autoCastField(Schema.Field field, Object rawObj) {
} else {
return rawObj;
}
+ } else if (CalciteUtils.DATE.typesEqual(type) || CalciteUtils.NULLABLE_DATE.typesEqual(type)) {
+ if (rawObj instanceof GregorianCalendar) { // used by the SQL CLI
+ GregorianCalendar calendar = (GregorianCalendar) rawObj;
+ return Instant.ofEpochMilli(calendar.getTimeInMillis())
+ .atZone(calendar.getTimeZone().toZoneId())
+ .toLocalDate();
+ } else {
+ return LocalDate.ofEpochDay((Integer) rawObj);
+ }
} else if (CalciteUtils.isDateTimeType(type)) {
- // Internal representation of DateType in Calcite is convertible to Joda's Datetime.
+ // Internal representation of Date in Calcite is convertible to Joda's Datetime.
return new DateTime(rawObj);
} else if (type.getTypeName().isNumericType()
&& ((rawObj instanceof String)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index e0b994d5815f..832656737754 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -25,6 +25,7 @@
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.BiMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
@@ -43,15 +44,6 @@ public class CalciteUtils {
// SQL has schema types that do not directly correspond to Beam Schema types. We define
// LogicalTypes to represent each of these types.
- /** A LogicalType corresponding to DATE. */
- public static class DateType extends PassThroughLogicalType {
- public static final String IDENTIFIER = "SqlDateType";
-
- public DateType() {
- super(IDENTIFIER, FieldType.STRING, "", FieldType.DATETIME);
- }
- }
-
/** A LogicalType corresponding to TIME. */
public static class TimeType extends PassThroughLogicalType {
public static final String IDENTIFIER = "SqlTimeType";
@@ -96,7 +88,7 @@ public static boolean isDateTimeType(FieldType fieldType) {
if (fieldType.getTypeName().isLogicalType()) {
String logicalId = fieldType.getLogicalType().getIdentifier();
- return logicalId.equals(DateType.IDENTIFIER)
+ return logicalId.equals(SqlTypes.DATE.getIdentifier())
|| logicalId.equals(TimeType.IDENTIFIER)
|| logicalId.equals(TimeWithLocalTzType.IDENTIFIER)
|| logicalId.equals(TimestampWithLocalTzType.IDENTIFIER);
@@ -128,9 +120,9 @@ public static boolean isStringType(FieldType fieldType) {
public static final FieldType VARBINARY = FieldType.BYTES;
public static final FieldType VARCHAR = FieldType.STRING;
public static final FieldType CHAR = FieldType.logicalType(new CharType());
- public static final FieldType DATE = FieldType.logicalType(new DateType());
+ public static final FieldType DATE = FieldType.logicalType(SqlTypes.DATE);
public static final FieldType NULLABLE_DATE =
- FieldType.logicalType(new DateType()).withNullable(true);
+ FieldType.logicalType(SqlTypes.DATE).withNullable(true);
public static final FieldType TIME = FieldType.logicalType(new TimeType());
public static final FieldType NULLABLE_TIME =
FieldType.logicalType(new TimeType()).withNullable(true);
@@ -205,12 +197,16 @@ public static SqlTypeName toSqlTypeName(FieldType type) {
return SqlTypeName.MAP;
default:
SqlTypeName typeName = BEAM_TO_CALCITE_TYPE_MAPPING.get(type.withNullable(false));
- if (typeName != null) {
- return typeName;
- } else {
+ if (typeName == null) {
// This will happen e.g. if looking up a STRING type, and metadata isn't set to say which
// type of SQL string we want. In this case, use the default mapping.
- return BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type);
+ typeName = BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type);
+ }
+ if (typeName == null) {
+ throw new IllegalArgumentException(
+ String.format("Cannot find a matching Calcite SqlTypeName for Beam type: %s", type));
+ } else {
+ return typeName;
}
}
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
index 8b3d7e502d26..7cfcb951ca56 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql;
+import java.time.LocalDate;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -26,6 +27,7 @@
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -373,40 +375,6 @@ public void testNullInnerRow() {
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
- @Test
- public void testLogicalTypes() {
- DateTime dateTime = DateTime.parse("2020-02-02T00:00:00");
-
- Schema inputRowSchema =
- Schema.builder()
- .addField("timeTypeField", FieldType.logicalType(new DummySqlTimeType()))
- .addField("dateTypeField", FieldType.logicalType(new DummySqlDateType()))
- .build();
-
- Row row =
- Row.withSchema(inputRowSchema)
- .addValues(dateTime.getMillis(), dateTime.getMillis())
- .build();
-
- Schema outputRowSchema =
- Schema.builder()
- .addField("timeTypeField", FieldType.DATETIME)
- .addNullableField("dateTypeField", FieldType.DATETIME)
- .build();
-
- PCollection outputRow =
- pipeline
- .apply(Create.of(row).withRowSchema(inputRowSchema))
- .apply(
- SqlTransform.query(
- "SELECT timeTypeField, dateTypeField FROM PCOLLECTION GROUP BY timeTypeField, dateTypeField"));
-
- PAssert.that(outputRow)
- .containsInAnyOrder(Row.withSchema(outputRowSchema).addValues(dateTime, dateTime).build());
-
- pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
- }
-
private static class DummySqlTimeType implements Schema.LogicalType {
@Override
public String getIdentifier() {
@@ -439,38 +407,6 @@ public Long toInputType(Instant base) {
}
}
- private static class DummySqlDateType implements Schema.LogicalType {
- @Override
- public String getIdentifier() {
- return "SqlDateType";
- }
-
- @Override
- public FieldType getArgumentType() {
- return FieldType.STRING;
- }
-
- @Override
- public String getArgument() {
- return "";
- }
-
- @Override
- public Schema.FieldType getBaseType() {
- return Schema.FieldType.DATETIME;
- }
-
- @Override
- public Instant toBaseType(Long input) {
- return (input == null ? null : new Instant((long) input));
- }
-
- @Override
- public Long toInputType(Instant base) {
- return (base == null ? null : base.getMillis());
- }
- }
-
@Test
public void testNullDatetimeFields() {
Instant current = new Instant(1561671380000L); // Long value corresponds to 27/06/2019
@@ -483,14 +419,13 @@ public void testNullDatetimeFields() {
.addField("timeTypeField", FieldType.logicalType(new DummySqlTimeType()))
.addNullableField(
"nullableTimeTypeField", FieldType.logicalType(new DummySqlTimeType()))
- .addField("dateTypeField", FieldType.logicalType(new DummySqlDateType()))
- .addNullableField(
- "nullableDateTypeField", FieldType.logicalType(new DummySqlDateType()))
+ .addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE))
+ .addNullableField("nullableDateTypeField", FieldType.logicalType(SqlTypes.DATE))
.build();
Row dateTimeRow =
Row.withSchema(dateTimeFieldSchema)
- .addValues(current, null, date.getMillis(), null, current.getMillis(), null)
+ .addValues(current, null, date.getMillis(), null, LocalDate.of(2019, 6, 27), null)
.build();
PCollection outputRow =
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCastTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCastTest.java
index 01872a57dc71..8f5fe6500c22 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCastTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCastTest.java
@@ -17,16 +17,15 @@
*/
package org.apache.beam.sdk.extensions.sql;
-import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
-import static org.joda.time.DateTimeZone.UTC;
-
+import java.time.LocalDate;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
-import org.joda.time.DateTime;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -48,7 +47,10 @@ public void testCastToDate() {
.withRowSchema(INPUT_ROW_SCHEMA));
Schema resultType =
- Schema.builder().addInt32Field("f_int").addNullableField("f_date", DATETIME).build();
+ Schema.builder()
+ .addInt32Field("f_int")
+ .addNullableField("f_date", CalciteUtils.DATE)
+ .build();
PCollection result =
input.apply(
@@ -64,7 +66,7 @@ public void testCastToDate() {
PAssert.that(result)
.containsInAnyOrder(
- Row.withSchema(resultType).addValues(1, new DateTime(2018, 10, 18, 0, 0, UTC)).build());
+ Row.withSchema(resultType).addValues(1, LocalDate.of(2018, 10, 18)).build());
pipeline.run();
}
@@ -76,7 +78,11 @@ public void testCastToDateWithCase() {
Create.of(Row.withSchema(INPUT_ROW_SCHEMA).addValues(1).addValue("20181018").build())
.withRowSchema(INPUT_ROW_SCHEMA));
- Schema resultType = Schema.builder().addInt32Field("f_int").addDateTimeField("f_date").build();
+ Schema resultType =
+ Schema.builder()
+ .addInt32Field("f_int")
+ .addLogicalTypeField("f_date", SqlTypes.DATE)
+ .build();
PCollection result =
input.apply(
@@ -96,7 +102,7 @@ public void testCastToDateWithCase() {
PAssert.that(result)
.containsInAnyOrder(
- Row.withSchema(resultType).addValues(1, new DateTime(2018, 10, 18, 0, 0, UTC)).build());
+ Row.withSchema(resultType).addValues(1, LocalDate.of(2018, 10, 18)).build());
pipeline.run();
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index 214c4fa225e4..5672849622dc 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -28,6 +28,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import java.time.LocalDate;
import java.util.stream.Stream;
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
import org.apache.beam.sdk.extensions.sql.meta.Table;
@@ -268,7 +269,7 @@ public void test_time_types() throws Exception {
assertEquals(3, row.getFieldCount());
// test DATE field
- assertEquals("2018-11-01", row.getDateTime("f_date").toString("yyyy-MM-dd"));
+ assertEquals("2018-11-01", row.getLogicalTypeValue("f_date", LocalDate.class).toString());
// test TIME field
assertEquals("15:23:59.000", row.getDateTime("f_time").toString("HH:mm:ss.SSS"));
// test TIMESTAMP field
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
index 28ac9f41a159..87cc62570d47 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.extensions.sql;
-import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseDate;
import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTime;
import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
import static org.hamcrest.Matchers.equalTo;
@@ -33,6 +32,7 @@
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.math.RoundingMode;
+import java.time.LocalDate;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
@@ -40,6 +40,7 @@
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.integrationtest.BeamSqlBuiltinFunctionsIntegrationTestBase;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -1196,9 +1197,18 @@ public void testDatetimeInfixPlus() {
.addExpr(
"TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' YEAR",
parseTimestampWithUTCTimeZone("1986-01-19 01:02:03"))
- .addExpr("DATE '1984-04-19' + INTERVAL '2' DAY", parseDate("1984-04-21"))
- .addExpr("DATE '1984-04-19' + INTERVAL '1' MONTH", parseDate("1984-05-19"))
- .addExpr("DATE '1984-04-19' + INTERVAL '3' YEAR", parseDate("1987-04-19"))
+ .addExpr(
+ "DATE '1984-04-19' + INTERVAL '2' DAY",
+ LocalDate.parse("1984-04-21"),
+ CalciteUtils.DATE)
+ .addExpr(
+ "DATE '1984-04-19' + INTERVAL '1' MONTH",
+ LocalDate.parse("1984-05-19"),
+ CalciteUtils.DATE)
+ .addExpr(
+ "DATE '1984-04-19' + INTERVAL '3' YEAR",
+ LocalDate.parse("1987-04-19"),
+ CalciteUtils.DATE)
.addExpr("TIME '14:28:30' + INTERVAL '15' SECOND", parseTime("14:28:45"))
.addExpr("TIME '14:28:30.239' + INTERVAL '4' MINUTE", parseTime("14:32:30.239"))
.addExpr("TIME '14:28:30.2' + INTERVAL '4' HOUR", parseTime("18:28:30.2"));
@@ -1317,9 +1327,18 @@ public void testTimestampMinusInterval() {
.addExpr(
"TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '1' YEAR",
parseTimestampWithUTCTimeZone("1983-01-19 01:01:58"))
- .addExpr("DATE '1984-04-19' - INTERVAL '2' DAY", parseDate("1984-04-17"))
- .addExpr("DATE '1984-04-19' - INTERVAL '1' MONTH", parseDate("1984-03-19"))
- .addExpr("DATE '1984-04-19' - INTERVAL '3' YEAR", parseDate("1981-04-19"))
+ .addExpr(
+ "DATE '1984-04-19' - INTERVAL '2' DAY",
+ LocalDate.parse("1984-04-17"),
+ CalciteUtils.DATE)
+ .addExpr(
+ "DATE '1984-04-19' - INTERVAL '1' MONTH",
+ LocalDate.parse("1984-03-19"),
+ CalciteUtils.DATE)
+ .addExpr(
+ "DATE '1984-04-19' - INTERVAL '3' YEAR",
+ LocalDate.parse("1981-04-19"),
+ CalciteUtils.DATE)
.addExpr("TIME '14:28:30' - INTERVAL '15' SECOND", parseTime("14:28:15"))
.addExpr("TIME '14:28:30.239' - INTERVAL '4' MINUTE", parseTime("14:24:30.239"))
.addExpr("TIME '14:28:30.2' - INTERVAL '4' HOUR", parseTime("10:28:30.2"));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
index 0342cf03beb6..c25d70e08823 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -21,6 +21,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.time.LocalDate;
import java.util.Iterator;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.testing.PAssert;
@@ -71,8 +72,12 @@ public Void apply(Iterable input) {
assertTrue(millis - row.getDateTime(1).getMillis() > -1000);
// CURRENT_DATE
- assertTrue(millis - row.getDateTime(2).getMillis() < MILLIS_PER_DAY);
- assertTrue(millis - row.getDateTime(2).getMillis() > -MILLIS_PER_DAY);
+ assertTrue(
+ millis - row.getLogicalTypeValue(2, LocalDate.class).toEpochDay() * MILLIS_PER_DAY
+ < MILLIS_PER_DAY);
+ assertTrue(
+ millis - row.getLogicalTypeValue(2, LocalDate.class).toEpochDay() * MILLIS_PER_DAY
+ > -MILLIS_PER_DAY);
// CURRENT_TIME
assertTrue(timeMillis - row.getDateTime(3).getMillis() < 1000);
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
index 22b2de97caaf..aa807fe80933 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
@@ -92,13 +92,28 @@ public class SqlStdOperatorMappingTable {
FunctionSignatureId.FN_IFNULL,
FunctionSignatureId.FN_NULLIF,
+ // Date functions
+ FunctionSignatureId.FN_CURRENT_DATE, // current_date
+ FunctionSignatureId.FN_EXTRACT_FROM_DATE, // $extract
+ FunctionSignatureId.FN_DATE_FROM_YEAR_MONTH_DAY, // date
+ FunctionSignatureId.FN_DATE_FROM_TIMESTAMP, // date
+ // FunctionSignatureId.FN_DATE_FROM_DATETIME, // date
+ FunctionSignatureId.FN_DATE_ADD_DATE, // date_add
+ FunctionSignatureId.FN_DATE_SUB_DATE, // date_sub
+ FunctionSignatureId.FN_DATE_DIFF_DATE, // date_diff
+ FunctionSignatureId.FN_DATE_TRUNC_DATE, // date_trunc
+ FunctionSignatureId.FN_FORMAT_DATE, // format_date
+ FunctionSignatureId.FN_PARSE_DATE, // parse_date
+ FunctionSignatureId.FN_UNIX_DATE, // unix_date
+ FunctionSignatureId.FN_DATE_FROM_UNIX_DATE, // date_from_unix_date
+
// Timestamp functions
FunctionSignatureId.FN_CURRENT_TIMESTAMP, // current_timestamp
FunctionSignatureId.FN_EXTRACT_FROM_TIMESTAMP, // $extract
FunctionSignatureId.FN_STRING_FROM_TIMESTAMP, // string
FunctionSignatureId.FN_TIMESTAMP_FROM_STRING, // timestamp
FunctionSignatureId.FN_TIMESTAMP_FROM_DATE, // timestamp
- // FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME // timestamp
+ // FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME, // timestamp
FunctionSignatureId.FN_TIMESTAMP_ADD, // timestamp_add
FunctionSignatureId.FN_TIMESTAMP_SUB, // timestamp_sub
FunctionSignatureId.FN_TIMESTAMP_DIFF, // timestamp_diff
@@ -115,13 +130,9 @@ public class SqlStdOperatorMappingTable {
FunctionSignatureId.FN_TIMESTAMP_FROM_UNIX_MILLIS_INT64, // timestamp_from_unix_millis
// FunctionSignatureId.FN_TIMESTAMP_FROM_UNIX_MICROS_INT64, // timestamp_from_unix_micros
- // Date/Time/Datetime functions
- FunctionSignatureId.FN_EXTRACT_FROM_DATE,
+ // Time/Datetime functions
FunctionSignatureId.FN_EXTRACT_FROM_DATETIME,
- FunctionSignatureId.FN_EXTRACT_FROM_TIME,
- FunctionSignatureId.FN_DATE_FROM_YEAR_MONTH_DAY
- // TODO: FunctionSignatureId.FN_DATE_FROM_TIMESTAMP
- );
+ FunctionSignatureId.FN_EXTRACT_FROM_TIME);
// todo: Some of operators defined here are later overridden in ZetaSQLPlannerImpl.
// We should remove them from this table and add generic way to provide custom
@@ -314,11 +325,6 @@ public class SqlStdOperatorMappingTable {
// .put("sha256")
// .put("sha512")
- // date functions
- // .put("date_add", SqlStdOperatorTable.DATETIME_PLUS)
- // .put("date_sub", SqlStdOperatorTable.MINUS_DATE)
- .put("date", SqlOperators.DATE_OP)
-
// time functions
// .put("time_add", SqlStdOperatorTable.DATETIME_PLUS)
// .put("time_sub", SqlStdOperatorTable.MINUS_DATE)
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
index a30825c2281d..df5ff9b7d096 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
@@ -18,20 +18,18 @@
package org.apache.beam.sdk.extensions.sql.zetasql;
import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
/** TestInput. */
class TestInput {
- public static final FieldType DATE = FieldType.logicalType(new DateType());
- public static final FieldType TIME = FieldType.logicalType(new TimeType());
public static final TestBoundedTable BASIC_TABLE_ONE =
TestBoundedTable.of(
@@ -155,20 +153,6 @@ class TestInput {
DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:13"),
7L);
- public static final TestBoundedTable TIME_TABLE =
- TestBoundedTable.of(
- Schema.builder()
- .addNullableField("f_date", DATE)
- .addNullableField("f_time", TIME)
- .addNullableField("f_timestamp", FieldType.DATETIME)
- .addNullableField("f_timestamp_with_time_zone", FieldType.DATETIME)
- .build())
- .addRows(
- DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-11 00:00:00"),
- DateTimeUtils.parseTimestampWithUTCTimeZone("1970-01-01 12:33:59.348"),
- DateTimeUtils.parseTimestampWithUTCTimeZone("2018-12-20 23:59:59.999"),
- DateTimeUtils.parseTimestampWithTimeZone("2018-12-10 10:38:59-1000"));
-
public static final TestBoundedTable TABLE_ALL_NULL =
TestBoundedTable.of(
Schema.builder()
@@ -249,6 +233,16 @@ class TestInput {
ImmutableMap.of("MAP_KEY_1", "MAP_VALUE_1"),
Row.withSchema(structSchema).addValues(1L, "data1").build());
+ private static final Schema TABLE_WTH_DATE_SCHEMA =
+ Schema.builder()
+ .addLogicalTypeField("date_field", SqlTypes.DATE)
+ .addStringField("str_field")
+ .build();
+ public static final TestBoundedTable TABLE_WITH_DATE =
+ TestBoundedTable.of(TABLE_WTH_DATE_SCHEMA)
+ .addRows(LocalDate.of(2008, 12, 25), "str1")
+ .addRows(LocalDate.of(2020, 04, 07), "str2");
+
public static byte[] stringToBytes(String s) {
return s.getBytes(StandardCharsets.UTF_8);
}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java
index f74e35ff245c..63c89cbfa95a 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java
@@ -25,15 +25,16 @@
import com.google.zetasql.TypeFactory;
import com.google.zetasql.Value;
import com.google.zetasql.ZetaSQLType.TypeKind;
+import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
@@ -52,7 +53,8 @@ public final class ZetaSqlUtils {
private ZetaSqlUtils() {}
- public static SqlTypeName zetaSqlTypeToCalciteType(TypeKind zetaSqlType) {
+ // Type conversion: ZetaSQL => Calcite
+ public static SqlTypeName zetaSqlTypeToCalciteTypeName(TypeKind zetaSqlType) {
switch (zetaSqlType) {
case TYPE_INT64:
return SqlTypeName.BIGINT;
@@ -62,6 +64,8 @@ public static SqlTypeName zetaSqlTypeToCalciteType(TypeKind zetaSqlType) {
return SqlTypeName.DOUBLE;
case TYPE_STRING:
return SqlTypeName.VARCHAR;
+ case TYPE_DATE:
+ return SqlTypeName.DATE;
case TYPE_TIMESTAMP:
return SqlTypeName.TIMESTAMP;
case TYPE_BOOL:
@@ -70,10 +74,11 @@ public static SqlTypeName zetaSqlTypeToCalciteType(TypeKind zetaSqlType) {
return SqlTypeName.VARBINARY;
// TODO[BEAM-9179] Add conversion code for ARRAY and ROW types
default:
- throw new IllegalArgumentException("Unsupported ZetaSQL type: " + zetaSqlType.name());
+ throw new UnsupportedOperationException("Unknown ZetaSQL type: " + zetaSqlType.name());
}
}
+ // Type conversion: Beam => ZetaSQL
public static Type beamFieldTypeToZetaSqlType(FieldType fieldType) {
switch (fieldType.getTypeName()) {
case INT64:
@@ -93,31 +98,22 @@ public static Type beamFieldTypeToZetaSqlType(FieldType fieldType) {
case BYTES:
return TypeFactory.createSimpleType(TypeKind.TYPE_BYTES);
case ARRAY:
- return createZetaSqlArrayTypeFromBeamElementFieldType(fieldType.getCollectionElementType());
+ return beamElementFieldTypeToZetaSqlArrayType(fieldType.getCollectionElementType());
case ROW:
- return createZetaSqlStructTypeFromBeamSchema(fieldType.getRowSchema());
+ return beamSchemaToZetaSqlStructType(fieldType.getRowSchema());
case LOGICAL_TYPE:
- switch (fieldType.getLogicalType().getIdentifier()) {
- case DateType.IDENTIFIER:
- return TypeFactory.createSimpleType(TypeKind.TYPE_DATE);
- case TimeType.IDENTIFIER:
- return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
- default:
- throw new IllegalArgumentException(
- "Unsupported Beam logical type: " + fieldType.getLogicalType().getIdentifier());
- }
+ return beamLogicalTypeToZetaSqlType(fieldType.getLogicalType().getIdentifier());
default:
throw new UnsupportedOperationException(
- "Unsupported Beam fieldType: " + fieldType.getTypeName());
+ "Unknown Beam fieldType: " + fieldType.getTypeName());
}
}
- private static ArrayType createZetaSqlArrayTypeFromBeamElementFieldType(
- FieldType elementFieldType) {
+ private static ArrayType beamElementFieldTypeToZetaSqlArrayType(FieldType elementFieldType) {
return TypeFactory.createArrayType(beamFieldTypeToZetaSqlType(elementFieldType));
}
- public static StructType createZetaSqlStructTypeFromBeamSchema(Schema schema) {
+ public static StructType beamSchemaToZetaSqlStructType(Schema schema) {
return TypeFactory.createStructType(
schema.getFields().stream()
.map(ZetaSqlUtils::beamFieldToZetaSqlStructField)
@@ -128,6 +124,17 @@ private static StructField beamFieldToZetaSqlStructField(Field field) {
return new StructField(field.getName(), beamFieldTypeToZetaSqlType(field.getType()));
}
+ private static Type beamLogicalTypeToZetaSqlType(String identifier) {
+ if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
+ return TypeFactory.createSimpleType(TypeKind.TYPE_DATE);
+ } else if (TimeType.IDENTIFIER.equals(identifier)) {
+ return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
+ } else {
+ throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
+ }
+ }
+
+ // Value conversion: Beam => ZetaSQL
public static Value javaObjectToZetaSqlValue(Object object, FieldType fieldType) {
if (object == null) {
return Value.createNullValue(beamFieldTypeToZetaSqlType(fieldType));
@@ -153,9 +160,11 @@ public static Value javaObjectToZetaSqlValue(Object object, FieldType fieldType)
(List