From 1d8fbcc9f579ba7be3ecbb05009e134e71b7b9e1 Mon Sep 17 00:00:00 2001 From: Andrew Pilloud Date: Mon, 12 Apr 2021 14:59:17 -0700 Subject: [PATCH 1/2] More tests for time types --- .../sdk/extensions/sql/BeamSqlDslBase.java | 28 +++++++++ .../extensions/sql/BeamSqlDslUdfUdafTest.java | 61 +++++++++++++++++-- 2 files changed, 83 insertions(+), 6 deletions(-) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java index 4298c0711cc4..426b95ae6df6 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java @@ -23,9 +23,13 @@ import java.math.BigDecimal; import java.text.ParseException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.List; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; @@ -84,6 +88,9 @@ public static void prepareClass() throws ParseException { .addFloatField("f_float") .addDoubleField("f_double") .addStringField("f_string") + .addField("f_date", FieldType.logicalType(SqlTypes.DATE)) + .addField("f_time", FieldType.logicalType(SqlTypes.TIME)) + .addField("f_datetime", FieldType.logicalType(SqlTypes.DATETIME)) .addDateTimeField("f_timestamp") .addInt32Field("f_int2") .addDecimalField("f_decimal") @@ -99,6 +106,9 @@ public static void prepareClass() throws ParseException { 1.0f, 1.0d, "string_row1", + LocalDate.of(2017, 1, 1), + LocalTime.of(1, 1, 3), + LocalDateTime.of(2017, 1, 1, 1, 1, 3), parseTimestampWithoutTimeZone("2017-01-01 01:01:03"), 0, new BigDecimal(1)) @@ -110,6 +120,9 @@ public static void prepareClass() throws ParseException { 2.0f, 2.0d, "string_row2", + LocalDate.of(2017, 1, 1), + LocalTime.of(1, 2, 3), + LocalDateTime.of(2017, 1, 1, 1, 2, 3), parseTimestampWithoutTimeZone("2017-01-01 01:02:03"), 0, new BigDecimal(2)) @@ -121,6 +134,9 @@ public static void prepareClass() throws ParseException { 3.0f, 3.0d, "string_row3", + LocalDate.of(2017, 1, 1), + LocalTime.of(1, 6, 3), + LocalDateTime.of(2017, 1, 1, 1, 6, 3), parseTimestampWithoutTimeZone("2017-01-01 01:06:03"), 0, new BigDecimal(3)) @@ -132,6 +148,9 @@ public static void prepareClass() throws ParseException { 4.0f, 4.0d, "第四行", + LocalDate.of(2017, 1, 1), + LocalTime.of(2, 4, 3), + LocalDateTime.of(2017, 1, 1, 2, 4, 3), parseTimestampWithoutTimeZone("2017-01-01 02:04:03"), 0, new BigDecimal(4)) @@ -147,6 +166,9 @@ public static void prepareClass() throws ParseException { 1.0f, 1.0d, "string_row1", + LocalDate.of(2017, 1, 1), + LocalTime.of(1, 1, 3), + LocalDateTime.of(2017, 1, 1, 1, 1, 3), parseTimestampWithUTCTimeZone("2017-01-01 01:01:03"), 0, new BigDecimal(1)) @@ -158,6 +180,9 @@ public static void prepareClass() throws ParseException { 2.0f, 2.0d, "string_row2", + LocalDate.of(2017, 1, 1), + LocalTime.of(1, 2, 3), + LocalDateTime.of(2017, 1, 1, 1, 2, 3), parseTimestampWithUTCTimeZone("2017-02-01 01:02:03"), 0, new BigDecimal(2)) @@ -169,6 +194,9 @@ public static void prepareClass() throws ParseException { 3.0f, 3.0d, "string_row3", + LocalDate.of(2017, 1, 1), + LocalTime.of(1, 6, 3), + LocalDateTime.of(2017, 1, 1, 1, 6, 3), parseTimestampWithUTCTimeZone("2017-03-01 01:06:03"), 0, new BigDecimal(3)) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java index 9e9d7f936cdd..b563ea1f2e88 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java @@ -22,7 +22,11 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; import com.google.auto.service.AutoService; +import java.sql.Date; +import java.sql.Time; import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalTime; import java.util.Arrays; import java.util.Map; import java.util.stream.IntStream; @@ -32,6 +36,7 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -64,9 +69,8 @@ public void testUdaf() throws Exception { pipeline.run().waitUntilFinish(); } - /** Test Joda time UDF. */ @Test - public void testJodaTimeUdf() throws Exception { + public void testTimestampUdaf() throws Exception { Schema resultType = Schema.builder().addDateTimeField("jodatime").build(); Row row = @@ -83,17 +87,48 @@ public void testJodaTimeUdf() throws Exception { pipeline.run().waitUntilFinish(); } - /** Test Joda time UDAF. */ @Test - public void testJodaTimeUdaf() throws Exception { - Schema resultType = Schema.builder().addDateTimeField("jodatime").build(); + public void testDateUdf() throws Exception { + Schema resultType = + Schema.builder().addField("result_date", FieldType.logicalType(SqlTypes.DATE)).build(); + + Row row = Row.withSchema(resultType).addValues(LocalDate.of(2016, 12, 31)).build(); + + String sql = "SELECT PRE_DATE(f_date) as result_date FROM PCOLLECTION WHERE f_int=1"; + PCollection result = + boundedInput1.apply( + "testTimeUdf", SqlTransform.query(sql).registerUdf("PRE_DATE", PreviousDate.class)); + PAssert.that(result).containsInAnyOrder(row); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testTimeUdf() throws Exception { + Schema resultType = + Schema.builder().addField("result_time", FieldType.logicalType(SqlTypes.TIME)).build(); + + Row row = Row.withSchema(resultType).addValues(LocalTime.of(0, 1, 3)).build(); + + String sql = "SELECT PRE_HOUR(f_time) as result_time FROM PCOLLECTION WHERE f_int=1"; + PCollection result = + boundedInput1.apply( + "testTimeUdf", SqlTransform.query(sql).registerUdf("PRE_HOUR", PreviousHour.class)); + PAssert.that(result).containsInAnyOrder(row); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testTimestampUdf() throws Exception { + Schema resultType = Schema.builder().addDateTimeField("result_time").build(); Row row = Row.withSchema(resultType) .addValues(parseTimestampWithoutTimeZone("2016-12-31 01:01:03")) .build(); - String sql = "SELECT PRE_DAY(f_timestamp) as jodatime FROM PCOLLECTION WHERE f_int=1"; + String sql = "SELECT PRE_DAY(f_timestamp) as result_time FROM PCOLLECTION WHERE f_int=1"; PCollection result = boundedInput1.apply( "testTimeUdf", SqlTransform.query(sql).registerUdf("PRE_DAY", PreviousDay.class)); @@ -377,7 +412,21 @@ public static String eval( } } + /** A UDF to test support of date. */ + public static final class PreviousDate implements BeamSqlUdf { + public static Date eval(Date date) { + return new Date(date.getTime() - 24 * 3600 * 1000L); + } + } + /** A UDF to test support of time. */ + public static final class PreviousHour implements BeamSqlUdf { + public static Time eval(Time time) { + return new Time(time.getTime() - 3600 * 1000L); + } + } + + /** A UDF to test support of timestamp. */ public static final class PreviousDay implements BeamSqlUdf { public static Timestamp eval(Timestamp time) { return new Timestamp(time.getTime() - 24 * 3600 * 1000L); From f5021d25e8105f67dd62857eb18ea5d2a6afb4d1 Mon Sep 17 00:00:00 2001 From: Andrew Pilloud Date: Tue, 6 Apr 2021 15:46:03 -0700 Subject: [PATCH 2/2] [BEAM-9379] Output outside of codegen, support rows --- .../extensions/sql/impl/rel/BeamCalcRel.java | 329 +++++++++--------- 1 file changed, 157 insertions(+), 172 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 984d3a70970d..d1e888ec2069 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -18,25 +18,29 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import static org.apache.beam.sdk.schemas.Schema.FieldType; -import static org.apache.beam.sdk.schemas.Schema.TypeName; import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument; import java.io.IOException; import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.lang.reflect.Type; import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.AbstractList; import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TimeZone; +import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; import org.apache.beam.sdk.extensions.sql.impl.JavaUdfLoader; import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl; @@ -63,7 +67,6 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockBuilder; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Expression; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Expressions; -import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoExpressionKind; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MemberDeclaration; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ParameterExpression; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Types; @@ -79,23 +82,20 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexProgram; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexSimplify; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexUtil; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.SqlFunctions; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.Function; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlOperator; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlConformance; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlConformanceEnum; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlUserDefinedFunction; -import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.BuiltInMethod; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.checkerframework.checker.nullness.qual.Nullable; import org.codehaus.commons.compiler.CompileException; import org.codehaus.janino.ScriptEvaluator; import org.joda.time.DateTime; import org.joda.time.Instant; -import org.joda.time.ReadableInstant; /** BeamRelNode to replace {@code Project} and {@code Filter} node. */ @SuppressWarnings({ @@ -108,10 +108,7 @@ public class BeamCalcRel extends AbstractBeamCalcRel { private static final long NANOS_PER_MILLISECOND = 1000000L; private static final long MILLIS_PER_DAY = 86400000L; - private static final ParameterExpression outputSchemaParam = - Expressions.parameter(Schema.class, "outputSchema"); - private static final ParameterExpression processContextParam = - Expressions.parameter(DoFn.ProcessContext.class, "c"); + private static final ParameterExpression rowParam = Expressions.parameter(Row.class, "row"); public BeamCalcRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RexProgram program) { super(cluster, traits, input, program); @@ -135,8 +132,8 @@ private class Transform extends PTransform, PCollection expand(PCollectionList pinput) { @@ -155,9 +152,6 @@ public PCollection expand(PCollectionList pinput) { final PhysType physType = PhysTypeImpl.of(typeFactory, getRowType(), JavaRowFormat.ARRAY, false); - Expression input = - Expressions.convert_(Expressions.call(processContextParam, "element"), Row.class); - final RexBuilder rexBuilder = getCluster().getRexBuilder(); final RelMetadataQuery mq = RelMetadataQuery.instance(); final RelOptPredicateList predicates = mq.getPulledUpPredicates(getInput()); @@ -169,7 +163,7 @@ public PCollection expand(PCollectionList pinput) { program, typeFactory, builder, - new InputGetterImpl(input, upstream.getSchema()), + new InputGetterImpl(rowParam, upstream.getSchema()), null, conformance); @@ -181,57 +175,29 @@ public PCollection expand(PCollectionList pinput) { builder, physType, DataContext.ROOT, - new InputGetterImpl(input, upstream.getSchema()), + new InputGetterImpl(rowParam, upstream.getSchema()), null); - boolean verifyRowValues = - pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues(); - - List listValues = Lists.newArrayListWithCapacity(expressions.size()); - for (int index = 0; index < expressions.size(); index++) { - Expression value = expressions.get(index); - FieldType toType = outputSchema.getField(index).getType(); - listValues.add(castOutput(value, toType)); - } - Method newArrayList = Types.lookupMethod(Arrays.class, "asList"); - Expression valueList = Expressions.call(newArrayList, listValues); - - // Expressions.call is equivalent to: output = - // Row.withSchema(outputSchema).attachValue(values); - Expression output = Expressions.call(Row.class, "withSchema", outputSchemaParam); - - if (verifyRowValues) { - Method attachValues = Types.lookupMethod(Row.Builder.class, "addValues", List.class); - output = Expressions.call(output, attachValues, valueList); - output = Expressions.call(output, "build"); - } else { - Method attachValues = Types.lookupMethod(Row.Builder.class, "attachValues", List.class); - output = Expressions.call(output, attachValues, valueList); - } - builder.add( - // Expressions.ifThen is equivalent to: - // if (condition) { - // c.output(output); - // } - Expressions.ifThen( + Expressions.ifThenElse( condition, - Expressions.makeGoto( - GotoExpressionKind.Sequence, - null, - Expressions.call( - processContextParam, - Types.lookupMethod(DoFn.ProcessContext.class, "output", Object.class), - output)))); + Expressions.return_(null, physType.record(expressions)), + Expressions.return_(null, Expressions.constant(null)))); + + BeamSqlPipelineOptions options = + pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class); - CalcFn calcFn = new CalcFn(builder.toBlock().toString(), outputSchema, getJarPaths(program)); + CalcFn calcFn = + new CalcFn( + builder.toBlock().toString(), + outputSchema, + options.getVerifyRowValues(), + getJarPaths(program)); // validate generated code calcFn.compile(); - PCollection projectStream = upstream.apply(ParDo.of(calcFn)).setRowSchema(outputSchema); - - return projectStream; + return upstream.apply(ParDo.of(calcFn)).setRowSchema(outputSchema); } } @@ -239,12 +205,18 @@ public PCollection expand(PCollectionList pinput) { private static class CalcFn extends DoFn { private final String processElementBlock; private final Schema outputSchema; + private final boolean verifyRowValues; private final List jarPaths; private transient @Nullable ScriptEvaluator se = null; - public CalcFn(String processElementBlock, Schema outputSchema, List jarPaths) { + public CalcFn( + String processElementBlock, + Schema outputSchema, + boolean verifyRowValues, + List jarPaths) { this.processElementBlock = processElementBlock; this.outputSchema = outputSchema; + this.verifyRowValues = verifyRowValues; this.jarPaths = jarPaths; } @@ -260,12 +232,9 @@ ScriptEvaluator compile() { } } se.setParameters( - new String[] {outputSchemaParam.name, processContextParam.name, DataContext.ROOT.name}, - new Class[] { - (Class) outputSchemaParam.getType(), - (Class) processContextParam.getType(), - (Class) DataContext.ROOT.getType() - }); + new String[] {rowParam.name, DataContext.ROOT.name}, + new Class[] {(Class) rowParam.getType(), (Class) DataContext.ROOT.getType()}); + se.setReturnType(Object[].class); try { se.cook(processElementBlock); } catch (CompileException e) { @@ -283,12 +252,17 @@ public void setup() { @ProcessElement public void processElement(ProcessContext c) { assert se != null; + final Object[] v; try { - se.evaluate(new Object[] {outputSchema, c, CONTEXT_INSTANCE}); + v = (Object[]) se.evaluate(new Object[] {c.element(), CONTEXT_INSTANCE}); } catch (InvocationTargetException e) { throw new RuntimeException( "CalcFn failed to evaluate: " + processElementBlock, e.getCause()); } + if (v != null) { + Row row = toBeamRow(Arrays.asList(v), outputSchema, verifyRowValues); + c.output(row); + } } } @@ -311,110 +285,126 @@ private static List getJarPaths(RexProgram program) { return jarPaths.build(); } - private static final Map rawTypeMap = - ImmutableMap.builder() - .put(TypeName.BYTE, Byte.class) - .put(TypeName.INT16, Short.class) - .put(TypeName.INT32, Integer.class) - .put(TypeName.INT64, Long.class) - .put(TypeName.FLOAT, Float.class) - .put(TypeName.DOUBLE, Double.class) - .build(); - - private static Expression castOutput(Expression value, FieldType toType) { - Expression returnValue = value; - if (value.getType() == Object.class || !(value.getType() instanceof Class)) { - // fast copy path, just pass object through - returnValue = value; - } else if (CalciteUtils.isDateTimeType(toType) - && !Types.isAssignableFrom(ReadableInstant.class, (Class) value.getType())) { - returnValue = castOutputTime(value, toType); - } else if (toType.getTypeName() == TypeName.DECIMAL - && !Types.isAssignableFrom(BigDecimal.class, (Class) value.getType())) { - returnValue = Expressions.new_(BigDecimal.class, value); - } else if (toType.getTypeName() == TypeName.BYTES - && Types.isAssignableFrom(ByteString.class, (Class) value.getType())) { - returnValue = - Expressions.condition( - Expressions.equal(value, Expressions.constant(null)), - Expressions.constant(null), - Expressions.call(value, "getBytes")); - } else if (((Class) value.getType()).isPrimitive() - || Types.isAssignableFrom(Number.class, (Class) value.getType())) { - Type rawType = rawTypeMap.get(toType.getTypeName()); - if (rawType != null) { - returnValue = Types.castIfNecessary(rawType, value); - } - } else if (Types.isAssignableFrom(Iterable.class, value.getType())) { - // Passing an Iterable into newArrayList gets interpreted to mean copying each individual - // element. We want the - // entire Iterable to be treated as a single element, so we cast to Object. - returnValue = Expressions.convert_(value, Object.class); + static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValues) { + if (value == null) { + return null; + } + switch (fieldType.getTypeName()) { + // BEAM-12176: Numbers aren't always the type we expect. + case BYTE: + return ((Number) value).byteValue(); + case INT16: + return ((Number) value).shortValue(); + case INT32: + return ((Number) value).intValue(); + case INT64: + return ((Number) value).longValue(); + case FLOAT: + return ((Number) value).floatValue(); + case DOUBLE: + return ((Number) value).doubleValue(); + case DECIMAL: + if (value instanceof BigDecimal) { + return (BigDecimal) value; + } else if (value instanceof Long) { + return BigDecimal.valueOf((Long) value); + } else if (value instanceof Integer) { + return BigDecimal.valueOf((Integer) value); + } + return new BigDecimal(((Number) value).toString()); + case STRING: + return (String) value; + case BOOLEAN: + return (Boolean) value; + case DATETIME: + if (value instanceof Timestamp) { + value = SqlFunctions.toLong((Timestamp) value); + } + return Instant.ofEpochMilli(((Number) value).longValue()); + case BYTES: + if (value instanceof byte[]) { + return value; + } + return ((ByteString) value).getBytes(); + case ARRAY: + return toBeamList((List) value, fieldType.getCollectionElementType(), verifyValues); + case MAP: + return toBeamMap( + (Map) value, + fieldType.getMapKeyType(), + fieldType.getMapValueType(), + verifyValues); + case ROW: + if (value instanceof Object[]) { + value = Arrays.asList((Object[]) value); + } + return toBeamRow((List) value, fieldType.getRowSchema(), verifyValues); + case LOGICAL_TYPE: + String identifier = fieldType.getLogicalType().getIdentifier(); + if (CharType.IDENTIFIER.equals(identifier)) { + return (String) value; + } else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { + return Instant.ofEpochMilli(((Number) value).longValue()); + } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { + if (value instanceof Date) { + value = SqlFunctions.toInt((Date) value); + } + // BEAM-12175: value should always be Integer here, but it isn't. + return LocalDate.ofEpochDay(((Number) value).longValue()); + } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) { + if (value instanceof Time) { + value = SqlFunctions.toInt((Time) value); + } + // BEAM-12175: value should always be Integer here, but it isn't. + return LocalTime.ofNanoOfDay(((Number) value).longValue() * NANOS_PER_MILLISECOND); + } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) { + if (value instanceof Timestamp) { + value = SqlFunctions.toLong((Timestamp) value); + } + return LocalDateTime.of( + LocalDate.ofEpochDay(((Number) value).longValue() / MILLIS_PER_DAY), + LocalTime.ofNanoOfDay( + (((Number) value).longValue() % MILLIS_PER_DAY) * NANOS_PER_MILLISECOND)); + } else { + throw new UnsupportedOperationException("Unable to convert logical type " + identifier); + } + default: + throw new UnsupportedOperationException("Unable to convert " + fieldType.getTypeName()); } - returnValue = - Expressions.condition( - Expressions.equal(value, Expressions.constant(null)), - Expressions.constant(null), - returnValue); - return returnValue; } - private static Expression castOutputTime(Expression value, FieldType toType) { - Expression valueDateTime = value; + private static List toBeamList( + List arrayValue, FieldType elementType, boolean verifyValues) { + return arrayValue.stream() + .map(e -> toBeamObject(e, elementType, verifyValues)) + .collect(Collectors.toList()); + } - if (CalciteUtils.TIMESTAMP.typesEqual(toType) - || CalciteUtils.NULLABLE_TIMESTAMP.typesEqual(toType)) { - // Convert TIMESTAMP to joda Instant - if (value.getType() == java.sql.Timestamp.class) { - valueDateTime = Expressions.call(BuiltInMethod.TIMESTAMP_TO_LONG.method, valueDateTime); - } - valueDateTime = Expressions.new_(Instant.class, valueDateTime); - } else if (CalciteUtils.TIME.typesEqual(toType) - || CalciteUtils.NULLABLE_TIME.typesEqual(toType)) { - // Convert TIME to LocalTime - if (value.getType() == java.sql.Time.class) { - valueDateTime = Expressions.call(BuiltInMethod.TIME_TO_INT.method, valueDateTime); - } else if (value.getType() == Integer.class || value.getType() == Long.class) { - valueDateTime = Expressions.unbox(valueDateTime); - } - valueDateTime = - Expressions.multiply(valueDateTime, Expressions.constant(NANOS_PER_MILLISECOND)); - valueDateTime = Expressions.call(LocalTime.class, "ofNanoOfDay", valueDateTime); - } else if (CalciteUtils.DATE.typesEqual(toType) - || CalciteUtils.NULLABLE_DATE.typesEqual(toType)) { - // Convert DATE to LocalDate - if (value.getType() == java.sql.Date.class) { - valueDateTime = Expressions.call(BuiltInMethod.DATE_TO_INT.method, valueDateTime); - } else if (value.getType() == Integer.class || value.getType() == Long.class) { - valueDateTime = Expressions.unbox(valueDateTime); - } - valueDateTime = Expressions.call(LocalDate.class, "ofEpochDay", valueDateTime); - } else if (CalciteUtils.TIMESTAMP_WITH_LOCAL_TZ.typesEqual(toType) - || CalciteUtils.NULLABLE_TIMESTAMP_WITH_LOCAL_TZ.typesEqual(toType)) { - // Convert TimeStamp_With_Local_TimeZone to LocalDateTime - Expression dateValue = - Expressions.divide(valueDateTime, Expressions.constant(MILLIS_PER_DAY)); - Expression date = Expressions.call(LocalDate.class, "ofEpochDay", dateValue); - Expression timeValue = - Expressions.multiply( - Expressions.modulo(valueDateTime, Expressions.constant(MILLIS_PER_DAY)), - Expressions.constant(NANOS_PER_MILLISECOND)); - Expression time = Expressions.call(LocalTime.class, "ofNanoOfDay", timeValue); - valueDateTime = Expressions.call(LocalDateTime.class, "of", date, time); - } else { - throw new UnsupportedOperationException("Unknown DateTime type " + toType); + private static Map toBeamMap( + Map mapValue, + FieldType keyType, + FieldType elementType, + boolean verifyValues) { + Map output = new HashMap<>(mapValue.size()); + for (Map.Entry entry : mapValue.entrySet()) { + output.put( + toBeamObject(entry.getKey(), keyType, verifyValues), + toBeamObject(entry.getValue(), elementType, verifyValues)); } + return output; + } - // make conversion conditional on non-null input. - if (!((Class) value.getType()).isPrimitive()) { - valueDateTime = - Expressions.condition( - Expressions.equal(value, Expressions.constant(null)), - Expressions.constant(null), - valueDateTime); + private static Row toBeamRow(List structValue, Schema schema, boolean verifyValues) { + List objects = new ArrayList<>(schema.getFieldCount()); + assert structValue.size() == schema.getFieldCount(); + for (int i = 0; i < structValue.size(); i++) { + objects.add(toBeamObject(structValue.get(i), schema.getField(i).getType(), verifyValues)); } - - return valueDateTime; + Row row = + verifyValues + ? Row.withSchema(schema).addValues(objects).build() + : Row.withSchema(schema).attachValues(objects); + return row; } private static class InputGetterImpl implements RexToLixTranslator.InputGetter { @@ -429,12 +419,12 @@ private InputGetterImpl(Expression input, Schema inputSchema) { @Override public Expression field(BlockBuilder list, int index, Type storageType) { - return getBeamField(list, index, storageType, input, inputSchema); + return getBeamField(list, index, input, inputSchema); } // Read field from Beam Row private static Expression getBeamField( - BlockBuilder list, int index, Type storageType, Expression input, Schema schema) { + BlockBuilder list, int index, Expression input, Schema schema) { if (index >= schema.getFieldCount() || index < 0) { throw new IllegalArgumentException("Unable to find value #" + index); } @@ -479,11 +469,6 @@ private static Expression getBeamField( break; case ARRAY: value = Expressions.call(expression, "getArray", Expressions.constant(index)); - if (storageType == Object.class - && TypeName.ROW.equals(fieldType.getCollectionElementType().getTypeName())) { - // Workaround for missing row output support - return Expressions.convert_(value, Object.class); - } break; case MAP: value = Expressions.call(expression, "getMap", Expressions.constant(index)); @@ -655,7 +640,7 @@ private static Expression toCalciteRow(Expression input, Schema schema) { for (int i = 0; i < schema.getFieldCount(); i++) { BlockBuilder list = new BlockBuilder(/* optimizing= */ false, body); - Expression returnValue = getBeamField(list, i, /* storageType= */ null, row, schema); + Expression returnValue = getBeamField(list, i, row, schema); list.append(returnValue);