From dce14f8b5fd15cacb68e56b43107bdcb068c2814 Mon Sep 17 00:00:00 2001 From: Yueyang Qiu Date: Mon, 23 Mar 2020 17:45:48 -0700 Subject: [PATCH] Support ZetaSQL DATE type as a Beam LogicalType --- .../beam/sdk/schemas/logicaltypes/Date.java | 65 ++++ .../sdk/schemas/logicaltypes/SqlTypes.java | 30 ++ .../provider/datacatalog/SchemaUtils.java | 3 +- .../extensions/sql/impl/rel/BeamCalcRel.java | 38 ++- .../sql/impl/rel/BeamEnumerableConverter.java | 16 +- .../sql/impl/schema/BeamTableUtils.java | 14 +- .../sql/impl/utils/CalciteUtils.java | 28 +- .../extensions/sql/BeamComplexTypeTest.java | 75 +---- .../sdk/extensions/sql/BeamSqlCastTest.java | 22 +- .../sdk/extensions/sql/BeamSqlCliTest.java | 3 +- .../sql/BeamSqlDslSqlStdOperatorsTest.java | 33 +- .../BeamSqlDateFunctionsIntegrationTest.java | 9 +- .../zetasql/SqlStdOperatorMappingTable.java | 30 +- .../sdk/extensions/sql/zetasql/TestInput.java | 30 +- .../extensions/sql/zetasql/ZetaSqlUtils.java | 77 +++-- .../translation/ExpressionConverter.java | 2 +- .../sql/zetasql/ZetaSQLDialectSpecTest.java | 287 +++++++++++++++++- 17 files changed, 573 insertions(+), 189 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java new file mode 100644 index 000000000000..d942340db5d2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import java.time.LocalDate; +import org.apache.beam.sdk.schemas.Schema; + +/** + * A date without a time-zone. + * + *

It cannot represent an instant on the time-line without additional information such as an + * offset or time-zone. + * + *

Its input type is a {@link LocalDate}, and base type is a {@link Long} that represents a + * incrementing count of days where day 0 is 1970-01-01 (ISO). + */ +public class Date implements Schema.LogicalType { + + @Override + public String getIdentifier() { + return "beam:logical_type:date:v1"; + } + + // unused + @Override + public Schema.FieldType getArgumentType() { + return Schema.FieldType.STRING; + } + + // unused + @Override + public String getArgument() { + return ""; + } + + @Override + public Schema.FieldType getBaseType() { + return Schema.FieldType.INT64; + } + + @Override + public Long toBaseType(LocalDate input) { + return input == null ? null : input.toEpochDay(); + } + + @Override + public LocalDate toInputType(Long base) { + return base == null ? null : LocalDate.ofEpochDay(base); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java new file mode 100644 index 000000000000..4dbab4825159 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import java.time.LocalDate; +import org.apache.beam.sdk.schemas.Schema.LogicalType; + +/** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType}s corresponding to SQL data types. */ +public class SqlTypes { + + private SqlTypes() {} + + /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL DATE type. */ + public static final LogicalType DATE = new Date(); +} diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java index 8e76f4cc7bb1..0ce9f335bc89 100644 --- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Strings; import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap; @@ -38,7 +39,7 @@ class SchemaUtils { ImmutableMap.builder() .put("BOOL", FieldType.BOOLEAN) .put("BYTES", FieldType.BYTES) - .put("DATE", FieldType.logicalType(new CalciteUtils.DateType())) + .put("DATE", FieldType.logicalType(SqlTypes.DATE)) .put("DATETIME", FieldType.DATETIME) .put("DOUBLE", FieldType.DOUBLE) .put("FLOAT", FieldType.DOUBLE) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index b9821aa1f237..25392ff7cc5e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -20,13 +20,13 @@ import static org.apache.beam.sdk.schemas.Schema.FieldType; import static org.apache.beam.sdk.schemas.Schema.TypeName; import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.lang.reflect.Type; import java.math.BigDecimal; +import java.time.LocalDate; import java.util.AbstractList; import java.util.AbstractMap; import java.util.Arrays; @@ -39,11 +39,11 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimestampWithLocalTzType; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -315,7 +315,7 @@ private static Expression castOutput(Expression value, FieldType toType) { private static Expression castOutputTime(Expression value, FieldType toType) { Expression valueDateTime = value; - // First, convert to millis + // First, convert to millis (except for DATE type) if (CalciteUtils.TIMESTAMP.typesEqual(toType) || CalciteUtils.NULLABLE_TIMESTAMP.typesEqual(toType)) { if (value.getType() == java.sql.Timestamp.class) { @@ -331,13 +331,16 @@ private static Expression castOutputTime(Expression value, FieldType toType) { if (value.getType() == java.sql.Date.class) { valueDateTime = Expressions.call(BuiltInMethod.DATE_TO_INT.method, valueDateTime); } - valueDateTime = Expressions.multiply(valueDateTime, Expressions.constant(MILLIS_PER_DAY)); } else { throw new UnsupportedOperationException("Unknown DateTime type " + toType); } - // Second, convert to joda Instant - valueDateTime = Expressions.new_(Instant.class, valueDateTime); + // Second, convert to joda Instant (or LocalDate for DATE type) + if (CalciteUtils.DATE.typesEqual(toType) || CalciteUtils.NULLABLE_DATE.typesEqual(toType)) { + valueDateTime = Expressions.call(LocalDate.class, "ofEpochDay", valueDateTime); + } else { + valueDateTime = Expressions.new_(Instant.class, valueDateTime); + } // Third, make conversion conditional on non-null input. if (!((Class) value.getType()).isPrimitive()) { @@ -371,9 +374,9 @@ private static class InputGetterImpl implements RexToLixTranslator.InputGetter { .put(TypeName.ROW, Row.class) .build(); - private static final Map LOGICAL_TYPE_CONVERSION_MAP = + private static final Map LOGICAL_TYPE_TO_BASE_TYPE_MAP = ImmutableMap.builder() - .put(DateType.IDENTIFIER, ReadableInstant.class) + .put(SqlTypes.DATE.getIdentifier(), Long.class) .put(TimeType.IDENTIFIER, ReadableInstant.class) .put(TimeWithLocalTzType.IDENTIFIER, ReadableInstant.class) .put(TimestampWithLocalTzType.IDENTIFIER, ReadableInstant.class) @@ -406,7 +409,7 @@ private static Expression value( if (storageType == Object.class) { convertTo = Object.class; } else if (fromType.getTypeName().isLogicalType()) { - convertTo = LOGICAL_TYPE_CONVERSION_MAP.get(fromType.getLogicalType().getIdentifier()); + convertTo = LOGICAL_TYPE_TO_BASE_TYPE_MAP.get(fromType.getLogicalType().getIdentifier()); } else { convertTo = TYPE_CONVERSION_MAP.get(fromType.getTypeName()); } @@ -427,18 +430,13 @@ private static Expression value( private static Expression value(Expression value, Schema.FieldType type) { if (type.getTypeName().isLogicalType()) { - Expression millisField = Expressions.call(value, "getMillis"); String logicalId = type.getLogicalType().getIdentifier(); - if (logicalId.equals(TimeType.IDENTIFIER)) { - return nullOr(value, Expressions.convert_(millisField, int.class)); - } else if (logicalId.equals(DateType.IDENTIFIER)) { - value = - nullOr( - value, - Expressions.convert_( - Expressions.divide(millisField, Expressions.constant(MILLIS_PER_DAY)), - int.class)); - } else if (!logicalId.equals(CharType.IDENTIFIER)) { + if (TimeType.IDENTIFIER.equals(logicalId)) { + return nullOr( + value, Expressions.convert_(Expressions.call(value, "getMillis"), int.class)); + } else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) { + value = nullOr(value, value); + } else if (!CharType.IDENTIFIER.equals(logicalId)) { throw new UnsupportedOperationException( "Unknown LogicalType " + type.getLogicalType().getIdentifier()); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java index 11820a0df943..1a688df93330 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java @@ -18,9 +18,9 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY; import java.io.IOException; +import java.time.LocalDate; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -36,7 +36,6 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.metrics.Counter; @@ -50,6 +49,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; @@ -303,11 +303,15 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) { switch (type.getTypeName()) { case LOGICAL_TYPE: String logicalId = type.getLogicalType().getIdentifier(); - if (logicalId.equals(TimeType.IDENTIFIER)) { + if (TimeType.IDENTIFIER.equals(logicalId)) { return (int) ((ReadableInstant) beamValue).getMillis(); - } else if (logicalId.equals(DateType.IDENTIFIER)) { - return (int) (((ReadableInstant) beamValue).getMillis() / MILLIS_PER_DAY); - } else if (logicalId.equals(CharType.IDENTIFIER)) { + } else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) { + if (beamValue instanceof Long) { // base type + return ((Long) beamValue).intValue(); + } else { // input type + return (int) (((LocalDate) beamValue).toEpochDay()); + } + } else if (CharType.IDENTIFIER.equals(logicalId)) { return beamValue; } else { throw new UnsupportedOperationException("Unknown DateTime type " + logicalId); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java index f3c2704cdbc4..aa8767464ef7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java @@ -22,7 +22,10 @@ import java.io.IOException; import java.io.StringWriter; import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; import java.util.ArrayList; +import java.util.GregorianCalendar; import java.util.List; import java.util.stream.IntStream; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; @@ -114,8 +117,17 @@ public static Object autoCastField(Schema.Field field, Object rawObj) { } else { return rawObj; } + } else if (CalciteUtils.DATE.typesEqual(type) || CalciteUtils.NULLABLE_DATE.typesEqual(type)) { + if (rawObj instanceof GregorianCalendar) { // used by the SQL CLI + GregorianCalendar calendar = (GregorianCalendar) rawObj; + return Instant.ofEpochMilli(calendar.getTimeInMillis()) + .atZone(calendar.getTimeZone().toZoneId()) + .toLocalDate(); + } else { + return LocalDate.ofEpochDay((Integer) rawObj); + } } else if (CalciteUtils.isDateTimeType(type)) { - // Internal representation of DateType in Calcite is convertible to Joda's Datetime. + // Internal representation of Date in Calcite is convertible to Joda's Datetime. return new DateTime(rawObj); } else if (type.getTypeName().isNumericType() && ((rawObj instanceof String) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index e0b994d5815f..832656737754 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.BiMap; import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableBiMap; import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap; @@ -43,15 +44,6 @@ public class CalciteUtils { // SQL has schema types that do not directly correspond to Beam Schema types. We define // LogicalTypes to represent each of these types. - /** A LogicalType corresponding to DATE. */ - public static class DateType extends PassThroughLogicalType { - public static final String IDENTIFIER = "SqlDateType"; - - public DateType() { - super(IDENTIFIER, FieldType.STRING, "", FieldType.DATETIME); - } - } - /** A LogicalType corresponding to TIME. */ public static class TimeType extends PassThroughLogicalType { public static final String IDENTIFIER = "SqlTimeType"; @@ -96,7 +88,7 @@ public static boolean isDateTimeType(FieldType fieldType) { if (fieldType.getTypeName().isLogicalType()) { String logicalId = fieldType.getLogicalType().getIdentifier(); - return logicalId.equals(DateType.IDENTIFIER) + return logicalId.equals(SqlTypes.DATE.getIdentifier()) || logicalId.equals(TimeType.IDENTIFIER) || logicalId.equals(TimeWithLocalTzType.IDENTIFIER) || logicalId.equals(TimestampWithLocalTzType.IDENTIFIER); @@ -128,9 +120,9 @@ public static boolean isStringType(FieldType fieldType) { public static final FieldType VARBINARY = FieldType.BYTES; public static final FieldType VARCHAR = FieldType.STRING; public static final FieldType CHAR = FieldType.logicalType(new CharType()); - public static final FieldType DATE = FieldType.logicalType(new DateType()); + public static final FieldType DATE = FieldType.logicalType(SqlTypes.DATE); public static final FieldType NULLABLE_DATE = - FieldType.logicalType(new DateType()).withNullable(true); + FieldType.logicalType(SqlTypes.DATE).withNullable(true); public static final FieldType TIME = FieldType.logicalType(new TimeType()); public static final FieldType NULLABLE_TIME = FieldType.logicalType(new TimeType()).withNullable(true); @@ -205,12 +197,16 @@ public static SqlTypeName toSqlTypeName(FieldType type) { return SqlTypeName.MAP; default: SqlTypeName typeName = BEAM_TO_CALCITE_TYPE_MAPPING.get(type.withNullable(false)); - if (typeName != null) { - return typeName; - } else { + if (typeName == null) { // This will happen e.g. if looking up a STRING type, and metadata isn't set to say which // type of SQL string we want. In this case, use the default mapping. - return BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type); + typeName = BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type); + } + if (typeName == null) { + throw new IllegalArgumentException( + String.format("Cannot find a matching Calcite SqlTypeName for Beam type: %s", type)); + } else { + return typeName; } } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java index 8b3d7e502d26..7cfcb951ca56 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql; +import java.time.LocalDate; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -26,6 +27,7 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -373,40 +375,6 @@ public void testNullInnerRow() { pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); } - @Test - public void testLogicalTypes() { - DateTime dateTime = DateTime.parse("2020-02-02T00:00:00"); - - Schema inputRowSchema = - Schema.builder() - .addField("timeTypeField", FieldType.logicalType(new DummySqlTimeType())) - .addField("dateTypeField", FieldType.logicalType(new DummySqlDateType())) - .build(); - - Row row = - Row.withSchema(inputRowSchema) - .addValues(dateTime.getMillis(), dateTime.getMillis()) - .build(); - - Schema outputRowSchema = - Schema.builder() - .addField("timeTypeField", FieldType.DATETIME) - .addNullableField("dateTypeField", FieldType.DATETIME) - .build(); - - PCollection outputRow = - pipeline - .apply(Create.of(row).withRowSchema(inputRowSchema)) - .apply( - SqlTransform.query( - "SELECT timeTypeField, dateTypeField FROM PCOLLECTION GROUP BY timeTypeField, dateTypeField")); - - PAssert.that(outputRow) - .containsInAnyOrder(Row.withSchema(outputRowSchema).addValues(dateTime, dateTime).build()); - - pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); - } - private static class DummySqlTimeType implements Schema.LogicalType { @Override public String getIdentifier() { @@ -439,38 +407,6 @@ public Long toInputType(Instant base) { } } - private static class DummySqlDateType implements Schema.LogicalType { - @Override - public String getIdentifier() { - return "SqlDateType"; - } - - @Override - public FieldType getArgumentType() { - return FieldType.STRING; - } - - @Override - public String getArgument() { - return ""; - } - - @Override - public Schema.FieldType getBaseType() { - return Schema.FieldType.DATETIME; - } - - @Override - public Instant toBaseType(Long input) { - return (input == null ? null : new Instant((long) input)); - } - - @Override - public Long toInputType(Instant base) { - return (base == null ? null : base.getMillis()); - } - } - @Test public void testNullDatetimeFields() { Instant current = new Instant(1561671380000L); // Long value corresponds to 27/06/2019 @@ -483,14 +419,13 @@ public void testNullDatetimeFields() { .addField("timeTypeField", FieldType.logicalType(new DummySqlTimeType())) .addNullableField( "nullableTimeTypeField", FieldType.logicalType(new DummySqlTimeType())) - .addField("dateTypeField", FieldType.logicalType(new DummySqlDateType())) - .addNullableField( - "nullableDateTypeField", FieldType.logicalType(new DummySqlDateType())) + .addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE)) + .addNullableField("nullableDateTypeField", FieldType.logicalType(SqlTypes.DATE)) .build(); Row dateTimeRow = Row.withSchema(dateTimeFieldSchema) - .addValues(current, null, date.getMillis(), null, current.getMillis(), null) + .addValues(current, null, date.getMillis(), null, LocalDate.of(2019, 6, 27), null) .build(); PCollection outputRow = diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCastTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCastTest.java index 01872a57dc71..8f5fe6500c22 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCastTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCastTest.java @@ -17,16 +17,15 @@ */ package org.apache.beam.sdk.extensions.sql; -import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME; -import static org.joda.time.DateTimeZone.UTC; - +import java.time.LocalDate; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; -import org.joda.time.DateTime; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -48,7 +47,10 @@ public void testCastToDate() { .withRowSchema(INPUT_ROW_SCHEMA)); Schema resultType = - Schema.builder().addInt32Field("f_int").addNullableField("f_date", DATETIME).build(); + Schema.builder() + .addInt32Field("f_int") + .addNullableField("f_date", CalciteUtils.DATE) + .build(); PCollection result = input.apply( @@ -64,7 +66,7 @@ public void testCastToDate() { PAssert.that(result) .containsInAnyOrder( - Row.withSchema(resultType).addValues(1, new DateTime(2018, 10, 18, 0, 0, UTC)).build()); + Row.withSchema(resultType).addValues(1, LocalDate.of(2018, 10, 18)).build()); pipeline.run(); } @@ -76,7 +78,11 @@ public void testCastToDateWithCase() { Create.of(Row.withSchema(INPUT_ROW_SCHEMA).addValues(1).addValue("20181018").build()) .withRowSchema(INPUT_ROW_SCHEMA)); - Schema resultType = Schema.builder().addInt32Field("f_int").addDateTimeField("f_date").build(); + Schema resultType = + Schema.builder() + .addInt32Field("f_int") + .addLogicalTypeField("f_date", SqlTypes.DATE) + .build(); PCollection result = input.apply( @@ -96,7 +102,7 @@ public void testCastToDateWithCase() { PAssert.that(result) .containsInAnyOrder( - Row.withSchema(resultType).addValues(1, new DateTime(2018, 10, 18, 0, 0, UTC)).build()); + Row.withSchema(resultType).addValues(1, LocalDate.of(2018, 10, 18)).build()); pipeline.run(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java index 214c4fa225e4..5672849622dc 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import java.time.LocalDate; import java.util.stream.Stream; import org.apache.beam.sdk.extensions.sql.impl.ParseException; import org.apache.beam.sdk.extensions.sql.meta.Table; @@ -268,7 +269,7 @@ public void test_time_types() throws Exception { assertEquals(3, row.getFieldCount()); // test DATE field - assertEquals("2018-11-01", row.getDateTime("f_date").toString("yyyy-MM-dd")); + assertEquals("2018-11-01", row.getLogicalTypeValue("f_date", LocalDate.class).toString()); // test TIME field assertEquals("15:23:59.000", row.getDateTime("f_time").toString("HH:mm:ss.SSS")); // test TIMESTAMP field diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java index 28ac9f41a159..87cc62570d47 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.extensions.sql; -import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseDate; import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTime; import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone; import static org.hamcrest.Matchers.equalTo; @@ -33,6 +32,7 @@ import java.lang.reflect.Method; import java.math.BigDecimal; import java.math.RoundingMode; +import java.time.LocalDate; import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; @@ -40,6 +40,7 @@ import java.util.Random; import java.util.Set; import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.integrationtest.BeamSqlBuiltinFunctionsIntegrationTestBase; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -1196,9 +1197,18 @@ public void testDatetimeInfixPlus() { .addExpr( "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' YEAR", parseTimestampWithUTCTimeZone("1986-01-19 01:02:03")) - .addExpr("DATE '1984-04-19' + INTERVAL '2' DAY", parseDate("1984-04-21")) - .addExpr("DATE '1984-04-19' + INTERVAL '1' MONTH", parseDate("1984-05-19")) - .addExpr("DATE '1984-04-19' + INTERVAL '3' YEAR", parseDate("1987-04-19")) + .addExpr( + "DATE '1984-04-19' + INTERVAL '2' DAY", + LocalDate.parse("1984-04-21"), + CalciteUtils.DATE) + .addExpr( + "DATE '1984-04-19' + INTERVAL '1' MONTH", + LocalDate.parse("1984-05-19"), + CalciteUtils.DATE) + .addExpr( + "DATE '1984-04-19' + INTERVAL '3' YEAR", + LocalDate.parse("1987-04-19"), + CalciteUtils.DATE) .addExpr("TIME '14:28:30' + INTERVAL '15' SECOND", parseTime("14:28:45")) .addExpr("TIME '14:28:30.239' + INTERVAL '4' MINUTE", parseTime("14:32:30.239")) .addExpr("TIME '14:28:30.2' + INTERVAL '4' HOUR", parseTime("18:28:30.2")); @@ -1317,9 +1327,18 @@ public void testTimestampMinusInterval() { .addExpr( "TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '1' YEAR", parseTimestampWithUTCTimeZone("1983-01-19 01:01:58")) - .addExpr("DATE '1984-04-19' - INTERVAL '2' DAY", parseDate("1984-04-17")) - .addExpr("DATE '1984-04-19' - INTERVAL '1' MONTH", parseDate("1984-03-19")) - .addExpr("DATE '1984-04-19' - INTERVAL '3' YEAR", parseDate("1981-04-19")) + .addExpr( + "DATE '1984-04-19' - INTERVAL '2' DAY", + LocalDate.parse("1984-04-17"), + CalciteUtils.DATE) + .addExpr( + "DATE '1984-04-19' - INTERVAL '1' MONTH", + LocalDate.parse("1984-03-19"), + CalciteUtils.DATE) + .addExpr( + "DATE '1984-04-19' - INTERVAL '3' YEAR", + LocalDate.parse("1981-04-19"), + CalciteUtils.DATE) .addExpr("TIME '14:28:30' - INTERVAL '15' SECOND", parseTime("14:28:15")) .addExpr("TIME '14:28:30.239' - INTERVAL '4' MINUTE", parseTime("14:24:30.239")) .addExpr("TIME '14:28:30.2' - INTERVAL '4' HOUR", parseTime("10:28:30.2")); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java index 0342cf03beb6..c25d70e08823 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.time.LocalDate; import java.util.Iterator; import org.apache.beam.sdk.extensions.sql.SqlTransform; import org.apache.beam.sdk.testing.PAssert; @@ -71,8 +72,12 @@ public Void apply(Iterable input) { assertTrue(millis - row.getDateTime(1).getMillis() > -1000); // CURRENT_DATE - assertTrue(millis - row.getDateTime(2).getMillis() < MILLIS_PER_DAY); - assertTrue(millis - row.getDateTime(2).getMillis() > -MILLIS_PER_DAY); + assertTrue( + millis - row.getLogicalTypeValue(2, LocalDate.class).toEpochDay() * MILLIS_PER_DAY + < MILLIS_PER_DAY); + assertTrue( + millis - row.getLogicalTypeValue(2, LocalDate.class).toEpochDay() * MILLIS_PER_DAY + > -MILLIS_PER_DAY); // CURRENT_TIME assertTrue(timeMillis - row.getDateTime(3).getMillis() < 1000); diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java index 22b2de97caaf..aa807fe80933 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java @@ -92,13 +92,28 @@ public class SqlStdOperatorMappingTable { FunctionSignatureId.FN_IFNULL, FunctionSignatureId.FN_NULLIF, + // Date functions + FunctionSignatureId.FN_CURRENT_DATE, // current_date + FunctionSignatureId.FN_EXTRACT_FROM_DATE, // $extract + FunctionSignatureId.FN_DATE_FROM_YEAR_MONTH_DAY, // date + FunctionSignatureId.FN_DATE_FROM_TIMESTAMP, // date + // FunctionSignatureId.FN_DATE_FROM_DATETIME, // date + FunctionSignatureId.FN_DATE_ADD_DATE, // date_add + FunctionSignatureId.FN_DATE_SUB_DATE, // date_sub + FunctionSignatureId.FN_DATE_DIFF_DATE, // date_diff + FunctionSignatureId.FN_DATE_TRUNC_DATE, // date_trunc + FunctionSignatureId.FN_FORMAT_DATE, // format_date + FunctionSignatureId.FN_PARSE_DATE, // parse_date + FunctionSignatureId.FN_UNIX_DATE, // unix_date + FunctionSignatureId.FN_DATE_FROM_UNIX_DATE, // date_from_unix_date + // Timestamp functions FunctionSignatureId.FN_CURRENT_TIMESTAMP, // current_timestamp FunctionSignatureId.FN_EXTRACT_FROM_TIMESTAMP, // $extract FunctionSignatureId.FN_STRING_FROM_TIMESTAMP, // string FunctionSignatureId.FN_TIMESTAMP_FROM_STRING, // timestamp FunctionSignatureId.FN_TIMESTAMP_FROM_DATE, // timestamp - // FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME // timestamp + // FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME, // timestamp FunctionSignatureId.FN_TIMESTAMP_ADD, // timestamp_add FunctionSignatureId.FN_TIMESTAMP_SUB, // timestamp_sub FunctionSignatureId.FN_TIMESTAMP_DIFF, // timestamp_diff @@ -115,13 +130,9 @@ public class SqlStdOperatorMappingTable { FunctionSignatureId.FN_TIMESTAMP_FROM_UNIX_MILLIS_INT64, // timestamp_from_unix_millis // FunctionSignatureId.FN_TIMESTAMP_FROM_UNIX_MICROS_INT64, // timestamp_from_unix_micros - // Date/Time/Datetime functions - FunctionSignatureId.FN_EXTRACT_FROM_DATE, + // Time/Datetime functions FunctionSignatureId.FN_EXTRACT_FROM_DATETIME, - FunctionSignatureId.FN_EXTRACT_FROM_TIME, - FunctionSignatureId.FN_DATE_FROM_YEAR_MONTH_DAY - // TODO: FunctionSignatureId.FN_DATE_FROM_TIMESTAMP - ); + FunctionSignatureId.FN_EXTRACT_FROM_TIME); // todo: Some of operators defined here are later overridden in ZetaSQLPlannerImpl. // We should remove them from this table and add generic way to provide custom @@ -314,11 +325,6 @@ public class SqlStdOperatorMappingTable { // .put("sha256") // .put("sha512") - // date functions - // .put("date_add", SqlStdOperatorTable.DATETIME_PLUS) - // .put("date_sub", SqlStdOperatorTable.MINUS_DATE) - .put("date", SqlOperators.DATE_OP) - // time functions // .put("time_add", SqlStdOperatorTable.DATETIME_PLUS) // .put("time_sub", SqlStdOperatorTable.MINUS_DATE) diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java index a30825c2281d..df5ff9b7d096 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java @@ -18,20 +18,18 @@ package org.apache.beam.sdk.extensions.sql.zetasql; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; import java.util.Arrays; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** TestInput. */ class TestInput { - public static final FieldType DATE = FieldType.logicalType(new DateType()); - public static final FieldType TIME = FieldType.logicalType(new TimeType()); public static final TestBoundedTable BASIC_TABLE_ONE = TestBoundedTable.of( @@ -155,20 +153,6 @@ class TestInput { DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:13"), 7L); - public static final TestBoundedTable TIME_TABLE = - TestBoundedTable.of( - Schema.builder() - .addNullableField("f_date", DATE) - .addNullableField("f_time", TIME) - .addNullableField("f_timestamp", FieldType.DATETIME) - .addNullableField("f_timestamp_with_time_zone", FieldType.DATETIME) - .build()) - .addRows( - DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-11 00:00:00"), - DateTimeUtils.parseTimestampWithUTCTimeZone("1970-01-01 12:33:59.348"), - DateTimeUtils.parseTimestampWithUTCTimeZone("2018-12-20 23:59:59.999"), - DateTimeUtils.parseTimestampWithTimeZone("2018-12-10 10:38:59-1000")); - public static final TestBoundedTable TABLE_ALL_NULL = TestBoundedTable.of( Schema.builder() @@ -249,6 +233,16 @@ class TestInput { ImmutableMap.of("MAP_KEY_1", "MAP_VALUE_1"), Row.withSchema(structSchema).addValues(1L, "data1").build()); + private static final Schema TABLE_WTH_DATE_SCHEMA = + Schema.builder() + .addLogicalTypeField("date_field", SqlTypes.DATE) + .addStringField("str_field") + .build(); + public static final TestBoundedTable TABLE_WITH_DATE = + TestBoundedTable.of(TABLE_WTH_DATE_SCHEMA) + .addRows(LocalDate.of(2008, 12, 25), "str1") + .addRows(LocalDate.of(2020, 04, 07), "str2"); + public static byte[] stringToBytes(String s) { return s.getBytes(StandardCharsets.UTF_8); } diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java index f74e35ff245c..63c89cbfa95a 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java @@ -25,15 +25,16 @@ import com.google.zetasql.TypeFactory; import com.google.zetasql.Value; import com.google.zetasql.ZetaSQLType.TypeKind; +import java.time.LocalDate; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath; @@ -52,7 +53,8 @@ public final class ZetaSqlUtils { private ZetaSqlUtils() {} - public static SqlTypeName zetaSqlTypeToCalciteType(TypeKind zetaSqlType) { + // Type conversion: ZetaSQL => Calcite + public static SqlTypeName zetaSqlTypeToCalciteTypeName(TypeKind zetaSqlType) { switch (zetaSqlType) { case TYPE_INT64: return SqlTypeName.BIGINT; @@ -62,6 +64,8 @@ public static SqlTypeName zetaSqlTypeToCalciteType(TypeKind zetaSqlType) { return SqlTypeName.DOUBLE; case TYPE_STRING: return SqlTypeName.VARCHAR; + case TYPE_DATE: + return SqlTypeName.DATE; case TYPE_TIMESTAMP: return SqlTypeName.TIMESTAMP; case TYPE_BOOL: @@ -70,10 +74,11 @@ public static SqlTypeName zetaSqlTypeToCalciteType(TypeKind zetaSqlType) { return SqlTypeName.VARBINARY; // TODO[BEAM-9179] Add conversion code for ARRAY and ROW types default: - throw new IllegalArgumentException("Unsupported ZetaSQL type: " + zetaSqlType.name()); + throw new UnsupportedOperationException("Unknown ZetaSQL type: " + zetaSqlType.name()); } } + // Type conversion: Beam => ZetaSQL public static Type beamFieldTypeToZetaSqlType(FieldType fieldType) { switch (fieldType.getTypeName()) { case INT64: @@ -93,31 +98,22 @@ public static Type beamFieldTypeToZetaSqlType(FieldType fieldType) { case BYTES: return TypeFactory.createSimpleType(TypeKind.TYPE_BYTES); case ARRAY: - return createZetaSqlArrayTypeFromBeamElementFieldType(fieldType.getCollectionElementType()); + return beamElementFieldTypeToZetaSqlArrayType(fieldType.getCollectionElementType()); case ROW: - return createZetaSqlStructTypeFromBeamSchema(fieldType.getRowSchema()); + return beamSchemaToZetaSqlStructType(fieldType.getRowSchema()); case LOGICAL_TYPE: - switch (fieldType.getLogicalType().getIdentifier()) { - case DateType.IDENTIFIER: - return TypeFactory.createSimpleType(TypeKind.TYPE_DATE); - case TimeType.IDENTIFIER: - return TypeFactory.createSimpleType(TypeKind.TYPE_TIME); - default: - throw new IllegalArgumentException( - "Unsupported Beam logical type: " + fieldType.getLogicalType().getIdentifier()); - } + return beamLogicalTypeToZetaSqlType(fieldType.getLogicalType().getIdentifier()); default: throw new UnsupportedOperationException( - "Unsupported Beam fieldType: " + fieldType.getTypeName()); + "Unknown Beam fieldType: " + fieldType.getTypeName()); } } - private static ArrayType createZetaSqlArrayTypeFromBeamElementFieldType( - FieldType elementFieldType) { + private static ArrayType beamElementFieldTypeToZetaSqlArrayType(FieldType elementFieldType) { return TypeFactory.createArrayType(beamFieldTypeToZetaSqlType(elementFieldType)); } - public static StructType createZetaSqlStructTypeFromBeamSchema(Schema schema) { + public static StructType beamSchemaToZetaSqlStructType(Schema schema) { return TypeFactory.createStructType( schema.getFields().stream() .map(ZetaSqlUtils::beamFieldToZetaSqlStructField) @@ -128,6 +124,17 @@ private static StructField beamFieldToZetaSqlStructField(Field field) { return new StructField(field.getName(), beamFieldTypeToZetaSqlType(field.getType())); } + private static Type beamLogicalTypeToZetaSqlType(String identifier) { + if (SqlTypes.DATE.getIdentifier().equals(identifier)) { + return TypeFactory.createSimpleType(TypeKind.TYPE_DATE); + } else if (TimeType.IDENTIFIER.equals(identifier)) { + return TypeFactory.createSimpleType(TypeKind.TYPE_TIME); + } else { + throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier); + } + } + + // Value conversion: Beam => ZetaSQL public static Value javaObjectToZetaSqlValue(Object object, FieldType fieldType) { if (object == null) { return Value.createNullValue(beamFieldTypeToZetaSqlType(fieldType)); @@ -153,9 +160,11 @@ public static Value javaObjectToZetaSqlValue(Object object, FieldType fieldType) (List) object, fieldType.getCollectionElementType()); case ROW: return beamRowToZetaSqlStructValue((Row) object, fieldType.getRowSchema()); + case LOGICAL_TYPE: + return beamLogicalObjectToZetaSqlValue(fieldType.getLogicalType().getIdentifier(), object); default: throw new UnsupportedOperationException( - "Unsupported Beam fieldType: " + fieldType.getTypeName()); + "Unknown Beam fieldType: " + fieldType.getTypeName()); } } @@ -173,8 +182,7 @@ private static Value javaListToZetaSqlArrayValue(List elements, FieldTyp elements.stream() .map(e -> javaObjectToZetaSqlValue(e, elementType)) .collect(Collectors.toList()); - return Value.createArrayValue( - createZetaSqlArrayTypeFromBeamElementFieldType(elementType), values); + return Value.createArrayValue(beamElementFieldTypeToZetaSqlArrayType(elementType), values); } public static Value beamRowToZetaSqlStructValue(Row row, Schema schema) { @@ -185,9 +193,22 @@ public static Value beamRowToZetaSqlStructValue(Row row, Schema schema) { javaObjectToZetaSqlValue( row.getBaseValue(i, Object.class), schema.getField(i).getType())); } - return Value.createStructValue(createZetaSqlStructTypeFromBeamSchema(schema), values); + return Value.createStructValue(beamSchemaToZetaSqlStructType(schema), values); + } + + private static Value beamLogicalObjectToZetaSqlValue(String identifier, Object object) { + if (SqlTypes.DATE.getIdentifier().equals(identifier)) { + if (object instanceof Long) { // base type + return Value.createDateValue(((Long) object).intValue()); + } else { // input type + return Value.createDateValue((int) ((LocalDate) object).toEpochDay()); + } + } else { + throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier); + } } + // Value conversion: ZetaSQL => Beam public static Object zetaSqlValueToJavaObject( Value value, FieldType fieldType, boolean verifyValues) { if (value.isNull()) { @@ -218,9 +239,11 @@ public static Object zetaSqlValueToJavaObject( value, fieldType.getCollectionElementType(), verifyValues); case ROW: return zetaSqlStructValueToBeamRow(value, fieldType.getRowSchema(), verifyValues); + case LOGICAL_TYPE: + return zetaSqlValueToBeamLogicalObject(fieldType.getLogicalType().getIdentifier(), value); default: throw new UnsupportedOperationException( - "Unsupported Beam fieldType: " + fieldType.getTypeName()); + "Unknown Beam fieldType: " + fieldType.getTypeName()); } } @@ -250,4 +273,12 @@ private static Row zetaSqlStructValueToBeamRow( : Row.withSchema(schema).attachValues(objects); return row; } + + private static Object zetaSqlValueToBeamLogicalObject(String identifier, Value value) { + if (SqlTypes.DATE.getIdentifier().equals(identifier)) { + return LocalDate.ofEpochDay(value.getDateValue()); + } else { + throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier); + } + } } diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java index 788e1a4dfce6..5ebd2e7c9cbd 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java @@ -798,7 +798,7 @@ private RexNode convertResolvedFunctionCall( if (returnType != null) { op = SqlOperators.createSimpleSqlFunction( - funName, ZetaSqlUtils.zetaSqlTypeToCalciteType(returnType.getKind())); + funName, ZetaSqlUtils.zetaSqlTypeToCalciteTypeName(returnType.getKind())); } else { throw new UnsupportedOperationException("Does not support ZetaSQL function: " + funName); } diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java index b5d8e077fa57..2c2ab69e1b24 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java @@ -36,13 +36,13 @@ import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_FOR_CASE_WHEN; import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_ARRAY; import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_ARRAY_FOR_UNNEST; +import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_DATE; import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_MAP; import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_STRUCT; import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_STRUCT_TIMESTAMP_STRING; import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_STRUCT_TWO; import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TIMESTAMP_TABLE_ONE; import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TIMESTAMP_TABLE_TWO; -import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TIME_TABLE; import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME; import static org.junit.Assert.assertTrue; @@ -55,6 +55,7 @@ import com.google.zetasql.ZetaSQLType.TypeKind; import com.google.zetasql.ZetaSQLValue.ValueProto; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -74,6 +75,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -2366,6 +2368,285 @@ public void testZetaSQLNestedQueryFive() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + ///////////////////////////////////////////////////////////////////////////// + // DATE type tests + ///////////////////////////////////////////////////////////////////////////// + + @Test + public void testDateLiteral() { + String sql = "SELECT DATE '2020-3-30'"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build()) + .addValues(LocalDate.of(2020, 3, 30)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateColumn() { + String sql = "SELECT FORMAT_DATE('%b-%d-%Y', date_field) FROM table_with_date"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addStringField("f_date_str").build()) + .addValues("Dec-25-2008") + .build(), + Row.withSchema(Schema.builder().addStringField("f_date_str").build()) + .addValues("Apr-07-2020") + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + // TODO[BEAM-9166]: Add a test for CURRENT_DATE function ("SELECT CURRENT_DATE()") + + @Test + public void testExtractDate() { + String sql = + "WITH Dates AS (\n" + + " SELECT DATE '2015-12-31' AS date UNION ALL\n" + + " SELECT DATE '2016-01-01'\n" + + ")\n" + + "SELECT\n" + + " EXTRACT(ISOYEAR FROM date) AS isoyear,\n" + + " EXTRACT(YEAR FROM date) AS year,\n" + + " EXTRACT(ISOWEEK FROM date) AS isoweek,\n" + // TODO[BEAM-9178]: Add tests for DATE_TRUNC and EXTRACT with "week with weekday" date + // parts once they are supported + // + " EXTRACT(WEEK FROM date) AS week,\n" + + " EXTRACT(MONTH FROM date) AS month\n" + + "FROM Dates\n"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + final Schema schema = + Schema.builder() + .addField("isoyear", FieldType.INT64) + .addField("year", FieldType.INT64) + .addField("isoweek", FieldType.INT64) + // .addField("week", FieldType.INT64) + .addField("month", FieldType.INT64) + .build(); + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(schema).addValues(2015L, 2015L, 53L /* , 52L */, 12L).build(), + Row.withSchema(schema).addValues(2015L, 2016L, 53L /* , 0L */, 1L).build()); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateFromYearMonthDay() { + String sql = "SELECT DATE(2008, 12, 25)"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build()) + .addValues(LocalDate.of(2008, 12, 25)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateFromTimestamp() { + String sql = "SELECT DATE(TIMESTAMP '2016-12-25 05:30:00+07', 'America/Los_Angeles')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build()) + .addValues(LocalDate.of(2016, 12, 24)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateAdd() { + String sql = + "SELECT " + + "DATE_ADD(DATE '2008-12-25', INTERVAL 5 DAY), " + + "DATE_ADD(DATE '2008-12-25', INTERVAL 1 MONTH), " + + "DATE_ADD(DATE '2008-12-25', INTERVAL 1 YEAR), "; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder() + .addLogicalTypeField("f_date1", SqlTypes.DATE) + .addLogicalTypeField("f_date2", SqlTypes.DATE) + .addLogicalTypeField("f_date3", SqlTypes.DATE) + .build()) + .addValues( + LocalDate.of(2008, 12, 30), + LocalDate.of(2009, 1, 25), + LocalDate.of(2009, 12, 25)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateSub() { + String sql = + "SELECT " + + "DATE_SUB(DATE '2008-12-25', INTERVAL 5 DAY), " + + "DATE_SUB(DATE '2008-12-25', INTERVAL 1 MONTH), " + + "DATE_SUB(DATE '2008-12-25', INTERVAL 1 YEAR), "; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder() + .addLogicalTypeField("f_date1", SqlTypes.DATE) + .addLogicalTypeField("f_date2", SqlTypes.DATE) + .addLogicalTypeField("f_date3", SqlTypes.DATE) + .build()) + .addValues( + LocalDate.of(2008, 12, 20), + LocalDate.of(2008, 11, 25), + LocalDate.of(2007, 12, 25)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateDiff() { + String sql = "SELECT DATE_DIFF(DATE '2010-07-07', DATE '2008-12-25', DAY)"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addInt64Field("f_date_diff").build()) + .addValues(559L) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateDiffNegativeResult() { + String sql = "SELECT DATE_DIFF(DATE '2017-12-17', DATE '2017-12-18', ISOWEEK)"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addInt64Field("f_date_diff").build()) + .addValues(-1L) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTrunc() { + String sql = "SELECT DATE_TRUNC(DATE '2015-06-15', ISOYEAR)"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder().addLogicalTypeField("f_date_trunc", SqlTypes.DATE).build()) + .addValues(LocalDate.of(2014, 12, 29)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testFormatDate() { + String sql = "SELECT FORMAT_DATE('%b-%d-%Y', DATE '2008-12-25')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addStringField("f_date_str").build()) + .addValues("Dec-25-2008") + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testParseDate() { + String sql = "SELECT PARSE_DATE('%m %d %y', '10 14 18')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build()) + .addValues(LocalDate.of(2018, 10, 14)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateToUnixInt64() { + String sql = "SELECT UNIX_DATE(DATE '2008-12-25')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addInt64Field("f_unix_date").build()) + .addValues(14238L) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateFromUnixInt64() { + String sql = "SELECT DATE_FROM_UNIX_DATE(14238)"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build()) + .addValues(LocalDate.of(2008, 12, 25)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + ///////////////////////////////////////////////////////////////////////////// // TIMESTAMP type tests ///////////////////////////////////////////////////////////////////////////// @@ -2499,7 +2780,7 @@ public void testExtractTimestamp() { + "SELECT\n" + " EXTRACT(ISOYEAR FROM timestamp) AS isoyear,\n" + " EXTRACT(YEAR FROM timestamp) AS year,\n" - + " EXTRACT(ISOWEEK FROM timestamp) AS week,\n" + + " EXTRACT(ISOWEEK FROM timestamp) AS isoweek,\n" // TODO[BEAM-9178]: Add tests for TIMESTAMP_TRUNC and EXTRACT with "week with weekday" // date parts once they are supported // + " EXTRACT(WEEK FROM timestamp) AS week,\n" @@ -4542,7 +4823,6 @@ private void initializeBeamTableProvider() { testBoundedTableMap.put("aggregate_test_table", AGGREGATE_TABLE_ONE); testBoundedTableMap.put("window_test_table", TIMESTAMP_TABLE_ONE); testBoundedTableMap.put("window_test_table_two", TIMESTAMP_TABLE_TWO); - testBoundedTableMap.put("time_test_table", TIME_TABLE); testBoundedTableMap.put("all_null_table", TABLE_ALL_NULL); testBoundedTableMap.put("table_with_struct", TABLE_WITH_STRUCT); testBoundedTableMap.put("table_with_struct_two", TABLE_WITH_STRUCT_TWO); @@ -4554,6 +4834,7 @@ private void initializeBeamTableProvider() { testBoundedTableMap.put("table_all_types", TABLE_ALL_TYPES); testBoundedTableMap.put("table_all_types_2", TABLE_ALL_TYPES_2); testBoundedTableMap.put("table_with_map", TABLE_WITH_MAP); + testBoundedTableMap.put("table_with_date", TABLE_WITH_DATE); testBoundedTableMap.put("table_with_struct_ts_string", TABLE_WITH_STRUCT_TIMESTAMP_STRING); tableProvider = new ReadOnlyTableProvider("test_table_provider", testBoundedTableMap);