From 4d222c571b71b3fb093eb3d19d52968da4e84f8f Mon Sep 17 00:00:00 2001 From: rahul8383 Date: Fri, 1 May 2020 00:12:55 +0530 Subject: [PATCH 1/2] [BEAM-8307] NPE in Calcite dialect when input PCollection has logical type in schema, from JdbcIO Transform --- .../sdk/schemas/logicaltypes/FixedBytes.java | 44 +--- .../logicaltypes/FixedLengthString.java | 55 +++++ .../IdenticalBaseTAndInputTLogicalType.java | 82 +++++++ .../schemas/logicaltypes/LogicalDecimal.java | 69 ++++++ .../logicaltypes/PassThroughLogicalType.java | 35 +-- .../logicaltypes/VariableLengthBytes.java | 55 +++++ .../logicaltypes/VariableLengthString.java | 55 +++++ .../logicaltypes/LogicalTypesTest.java | 22 ++ .../org/apache/beam/sdk/values/RowTest.java | 50 ++++ .../extensions/sql/impl/rel/BeamCalcRel.java | 9 +- .../sql/impl/utils/CalciteUtils.java | 51 +++- .../sql/impl/utils/CalciteUtilsTest.java | 142 ++++++++---- .../org/apache/beam/sdk/io/jdbc/JdbcUtil.java | 66 +++--- .../apache/beam/sdk/io/jdbc/LogicalTypes.java | 219 ------------------ .../apache/beam/sdk/io/jdbc/SchemaUtil.java | 83 +++---- .../apache/beam/sdk/io/jdbc/JdbcIOTest.java | 6 +- .../beam/sdk/io/jdbc/SchemaUtilTest.java | 43 ++-- 17 files changed, 638 insertions(+), 448 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/FixedLengthString.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/IdenticalBaseTAndInputTLogicalType.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalDecimal.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/VariableLengthBytes.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/VariableLengthString.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/FixedBytes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/FixedBytes.java index 4022c634acdf..7f0d56ef0515 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/FixedBytes.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/FixedBytes.java @@ -19,19 +19,18 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; -import java.util.Arrays; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.Schema.LogicalType; +import org.apache.beam.sdk.schemas.Schema; /** A LogicalType representing a fixed-size byte array. */ @Experimental(Kind.SCHEMAS) -public class FixedBytes implements LogicalType { - public static final String IDENTIFIER = "FixedBytes"; +public class FixedBytes extends IdenticalBaseTAndInputTLogicalType { + public static final String IDENTIFIER = "beam:logical_type:fixed_length_bytes:v1"; private final int byteArraySize; private FixedBytes(int byteArraySize) { + super(IDENTIFIER, Schema.FieldType.INT32, byteArraySize, Schema.FieldType.BYTES); this.byteArraySize = byteArraySize; } @@ -43,44 +42,15 @@ public int getLength() { return byteArraySize; } - @Override - public String getIdentifier() { - return IDENTIFIER; - } - - @Override - public FieldType getArgumentType() { - return FieldType.INT32; - } - - @Override - public Integer getArgument() { - return byteArraySize; - } - - @Override - public FieldType getBaseType() { - return FieldType.BYTES; - } - @Override public byte[] toBaseType(byte[] input) { - checkArgument(input.length == byteArraySize); + checkArgument(input == null || input.length == byteArraySize); return input; } @Override public byte[] toInputType(byte[] base) { - checkArgument(base.length <= byteArraySize); - if (base.length == byteArraySize) { - return base; - } else { - return Arrays.copyOf(base, byteArraySize); - } - } - - @Override - public String toString() { - return "FixedBytes: " + byteArraySize; + checkArgument(base == null || base.length == byteArraySize); + return base; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/FixedLengthString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/FixedLengthString.java new file mode 100644 index 000000000000..4f5a1b83e446 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/FixedLengthString.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Schema; + +/** A LogicalType representing a fixed-length string. */ +@Experimental(Experimental.Kind.SCHEMAS) +public class FixedLengthString extends IdenticalBaseTAndInputTLogicalType { + public static final String IDENTIFIER = "beam:logical_type:fixed_length_string:v1"; + private final int length; + + private FixedLengthString(int length) { + super(IDENTIFIER, Schema.FieldType.INT32, length, Schema.FieldType.STRING); + this.length = length; + } + + public int getLength() { + return length; + } + + public static FixedLengthString of(int length) { + return new FixedLengthString(length); + } + + @Override + public String toBaseType(String input) { + checkArgument(input == null || input.length() == length); + return input; + } + + @Override + public String toInputType(String base) { + checkArgument(base == null || base.length() == length); + return base; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/IdenticalBaseTAndInputTLogicalType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/IdenticalBaseTAndInputTLogicalType.java new file mode 100644 index 000000000000..ff0af5af1370 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/IdenticalBaseTAndInputTLogicalType.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import java.util.Objects; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Schema; + +/** A base class for LogicalTypes that use the same input type as the underlying base type. */ +@Experimental(Experimental.Kind.SCHEMAS) +public abstract class IdenticalBaseTAndInputTLogicalType implements Schema.LogicalType { + protected final String identifier; + protected final Schema.FieldType argumentType; + protected final Object argument; + protected final Schema.FieldType baseType; + + protected IdenticalBaseTAndInputTLogicalType( + String identifier, + Schema.FieldType argumentType, + Object argument, + Schema.FieldType baseType) { + this.identifier = identifier; + this.argumentType = argumentType; + this.argument = argument; + this.baseType = baseType; + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public Schema.FieldType getArgumentType() { + return argumentType; + } + + @Override + @SuppressWarnings("TypeParameterUnusedInFormals") + public ArgumentT getArgument() { + return (ArgumentT) argument; + } + + @Override + public Schema.FieldType getBaseType() { + return baseType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IdenticalBaseTAndInputTLogicalType that = (IdenticalBaseTAndInputTLogicalType) o; + return identifier.equals(that.identifier) + && argument.equals(that.argument) + && baseType.equals(that.baseType); + } + + @Override + public int hashCode() { + return Objects.hash(identifier, argument, baseType); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalDecimal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalDecimal.java new file mode 100644 index 000000000000..14384559dcd8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalDecimal.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.math.BigDecimal; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; + +/** A LogicalType representing a Decimal type with custom precision and scale. */ +@Experimental(Experimental.Kind.SCHEMAS) +public class LogicalDecimal extends IdenticalBaseTAndInputTLogicalType { + public static final String IDENTIFIER = "beam:logical_type:decimal:v1"; + private static final Schema schema = + Schema.builder().addInt32Field("precision").addInt32Field("scale").build(); + private final int precision; + private final int scale; + + private LogicalDecimal(int precision, int scale) { + super( + IDENTIFIER, + Schema.FieldType.row(schema), + Row.withSchema(schema).addValues(precision, scale).build(), + Schema.FieldType.STRING); + this.precision = precision; + this.scale = scale; + } + + public static LogicalDecimal of(int precision, int scale) { + return new LogicalDecimal(precision, scale); + } + + public int getPrecision() { + return precision; + } + + public int getScale() { + return scale; + } + + @Override + public BigDecimal toBaseType(BigDecimal input) { + checkArgument(input == null || (input.precision() == precision && input.scale() == scale)); + return input; + } + + @Override + public BigDecimal toInputType(BigDecimal base) { + checkArgument(base == null || (base.precision() == precision && base.scale() == scale)); + return base; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PassThroughLogicalType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PassThroughLogicalType.java index a42182f95aec..f8339682f2ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PassThroughLogicalType.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PassThroughLogicalType.java @@ -20,43 +20,14 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.Schema.LogicalType; -/** A base class for LogicalTypes that use the same Java type as the underlying base type. */ +/** A base class for LogicalTypes that use the same value as the underlying base value. */ @Experimental(Kind.SCHEMAS) -public abstract class PassThroughLogicalType implements LogicalType { - private final String identifier; - private final FieldType argumentType; - private final Object argument; - private final FieldType fieldType; +public abstract class PassThroughLogicalType extends IdenticalBaseTAndInputTLogicalType { protected PassThroughLogicalType( String identifier, FieldType argumentType, Object argument, FieldType fieldType) { - this.identifier = identifier; - this.argumentType = argumentType; - this.argument = argument; - this.fieldType = fieldType; - } - - @Override - public String getIdentifier() { - return identifier; - } - - @Override - public FieldType getArgumentType() { - return argumentType; - } - - @Override - @SuppressWarnings("TypeParameterUnusedInFormals") - public ArgumentT getArgument() { - return (ArgumentT) argument; - } - - @Override - public FieldType getBaseType() { - return fieldType; + super(identifier, argumentType, argument, fieldType); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/VariableLengthBytes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/VariableLengthBytes.java new file mode 100644 index 000000000000..1dee15f91878 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/VariableLengthBytes.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Schema; + +/** A LogicalType representing a variable-size byte array. */ +@Experimental(Experimental.Kind.SCHEMAS) +public class VariableLengthBytes extends IdenticalBaseTAndInputTLogicalType { + public static final String IDENTIFIER = "beam:logical_type:variable_length_bytes:v1"; + private final int byteArraySize; + + private VariableLengthBytes(int byteArraySize) { + super(IDENTIFIER, Schema.FieldType.INT32, byteArraySize, Schema.FieldType.BYTES); + this.byteArraySize = byteArraySize; + } + + public static VariableLengthBytes of(int byteArraySize) { + return new VariableLengthBytes(byteArraySize); + } + + public int getLength() { + return byteArraySize; + } + + @Override + public byte[] toBaseType(byte[] input) { + checkArgument(input == null || input.length <= byteArraySize); + return input; + } + + @Override + public byte[] toInputType(byte[] base) { + checkArgument(base == null || base.length <= byteArraySize); + return base; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/VariableLengthString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/VariableLengthString.java new file mode 100644 index 000000000000..87ae89651dd6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/VariableLengthString.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Schema; + +/** A LogicalType representing a fixed-length string. */ +@Experimental(Experimental.Kind.SCHEMAS) +public class VariableLengthString extends IdenticalBaseTAndInputTLogicalType { + public static final String IDENTIFIER = "beam:logical_type:variable_length_string:v1"; + private final int length; + + private VariableLengthString(int length) { + super(IDENTIFIER, Schema.FieldType.INT32, length, Schema.FieldType.STRING); + this.length = length; + } + + public static VariableLengthString of(int length) { + return new VariableLengthString(length); + } + + public int getLength() { + return length; + } + + @Override + public String toBaseType(String input) { + checkArgument(input == null || input.length() <= length); + return input; + } + + @Override + public String toInputType(String base) { + checkArgument(base == null || base.length() <= length); + return base; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java index f52f36057a3a..b8d96642007a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.schemas.logicaltypes; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import java.time.Duration; import java.time.Instant; @@ -97,4 +98,25 @@ public void testNanosDuration() { assertEquals(duration, row.getLogicalTypeValue(0, NanosDuration.class)); assertEquals(durationAsRow, row.getBaseValue(0, Row.class)); } + + @Test + // tests IdenticalBaseTAndInputTypeLogicalType Equals + public void testLogicalTypesEquals() { + // difference in IDENTIFIER + assertNotEquals(FixedBytes.of(10), VariableLengthBytes.of(10)); + assertNotEquals(FixedLengthString.of(10), VariableLengthString.of(10)); + + // difference in argument value + assertNotEquals(FixedBytes.of(5), VariableLengthBytes.of(10)); + assertNotEquals(FixedLengthString.of(5), VariableLengthString.of(10)); + + // difference in scale + assertNotEquals(LogicalDecimal.of(10, 5), LogicalDecimal.of(10, 2)); + + // difference in precision + assertNotEquals(LogicalDecimal.of(8, 5), LogicalDecimal.of(10, 5)); + + assertEquals(FixedBytes.of(10), FixedBytes.of(10)); + assertEquals(LogicalDecimal.of(10, 5), LogicalDecimal.of(10, 5)); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java index 7134e02eef61..901cd90f2e4c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java @@ -36,6 +36,10 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.schemas.logicaltypes.FixedLengthString; +import org.apache.beam.sdk.schemas.logicaltypes.LogicalDecimal; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthBytes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthString; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; @@ -762,4 +766,50 @@ public void testFixedBytes() { Row row = Row.withSchema(schema).withFieldValue("char", byteArray).build(); assertTrue(Arrays.equals(byteArray, row.getLogicalTypeValue("char", byte[].class))); } + + @Test + public void testNullLogicalTypes() { + + String lengthTenString = "length ten"; + byte[] lengthFiveByteArray = {1, 2, 3, 4, 5}; + BigDecimal customDecimal = new BigDecimal("12345.12345"); // precision=10, scale=5 + Schema schema = + Schema.builder() + .addField("char", FieldType.logicalType(FixedLengthString.of(10))) + .addNullableField("nullChar", FieldType.logicalType(FixedLengthString.of(10))) + .addField("varchar", FieldType.logicalType(VariableLengthString.of(100))) + .addNullableField("nullVarchar", FieldType.logicalType(VariableLengthString.of(100))) + .addField("binary", FieldType.logicalType(FixedBytes.of(5))) + .addNullableField("nullBinary", FieldType.logicalType(FixedLengthString.of(5))) + .addField("varbinary", FieldType.logicalType(VariableLengthBytes.of(100))) + .addNullableField("nullVarbinary", FieldType.logicalType(VariableLengthString.of(100))) + .addField("customDecimal", FieldType.logicalType(LogicalDecimal.of(10, 5))) + .addNullableField("nullCustomDecimal", FieldType.logicalType(LogicalDecimal.of(10, 5))) + .build(); + + Row row = + Row.withSchema(schema) + .withFieldValue("char", lengthTenString) + .withFieldValue("nullChar", null) + .withFieldValue("varchar", lengthTenString) + .withFieldValue("nullVarchar", null) + .withFieldValue("binary", lengthFiveByteArray) + .withFieldValue("nullBinary", null) + .withFieldValue("varbinary", lengthFiveByteArray) + .withFieldValue("nullVarbinary", null) + .withFieldValue("customDecimal", customDecimal) + .withFieldValue("nullCustomDecimal", null) + .build(); + + assertEquals(lengthTenString, row.getLogicalTypeValue("char", String.class)); + assertEquals(null, row.getLogicalTypeValue("nullChar", String.class)); + assertEquals(lengthTenString, row.getLogicalTypeValue("varchar", String.class)); + assertEquals(null, row.getLogicalTypeValue("nullVarchar", String.class)); + assertEquals(lengthFiveByteArray, row.getLogicalTypeValue("binary", byte[].class)); + assertEquals(null, row.getLogicalTypeValue("nullBinary", byte[].class)); + assertEquals(lengthFiveByteArray, row.getLogicalTypeValue("varbinary", byte[].class)); + assertEquals(null, row.getLogicalTypeValue("nullVarbinary", byte[].class)); + assertEquals(customDecimal, row.getLogicalTypeValue("customDecimal", byte[].class)); + assertEquals(null, row.getLogicalTypeValue("nullCustomDecimal", BigDecimal.class)); + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index b9821aa1f237..27abf9643a6c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -316,18 +316,15 @@ private static Expression castOutputTime(Expression value, FieldType toType) { Expression valueDateTime = value; // First, convert to millis - if (CalciteUtils.TIMESTAMP.typesEqual(toType) - || CalciteUtils.NULLABLE_TIMESTAMP.typesEqual(toType)) { + if (CalciteUtils.TIMESTAMP.typesEqual(toType.withNullable(false))) { if (value.getType() == java.sql.Timestamp.class) { valueDateTime = Expressions.call(BuiltInMethod.TIMESTAMP_TO_LONG.method, valueDateTime); } - } else if (CalciteUtils.TIME.typesEqual(toType) - || CalciteUtils.NULLABLE_TIME.typesEqual(toType)) { + } else if (CalciteUtils.TIME.typesEqual(toType.withNullable(false))) { if (value.getType() == java.sql.Time.class) { valueDateTime = Expressions.call(BuiltInMethod.TIME_TO_INT.method, valueDateTime); } - } else if (CalciteUtils.DATE.typesEqual(toType) - || CalciteUtils.NULLABLE_DATE.typesEqual(toType)) { + } else if (CalciteUtils.DATE.typesEqual(toType.withNullable(false))) { if (value.getType() == java.sql.Date.class) { valueDateTime = Expressions.call(BuiltInMethod.DATE_TO_INT.method, valueDateTime); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index e0b994d5815f..029a0c516c37 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -24,7 +24,12 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.schemas.logicaltypes.FixedLengthString; +import org.apache.beam.sdk.schemas.logicaltypes.LogicalDecimal; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthBytes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthString; import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.BiMap; import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableBiMap; import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap; @@ -129,15 +134,10 @@ public static boolean isStringType(FieldType fieldType) { public static final FieldType VARCHAR = FieldType.STRING; public static final FieldType CHAR = FieldType.logicalType(new CharType()); public static final FieldType DATE = FieldType.logicalType(new DateType()); - public static final FieldType NULLABLE_DATE = - FieldType.logicalType(new DateType()).withNullable(true); public static final FieldType TIME = FieldType.logicalType(new TimeType()); - public static final FieldType NULLABLE_TIME = - FieldType.logicalType(new TimeType()).withNullable(true); public static final FieldType TIME_WITH_LOCAL_TZ = FieldType.logicalType(new TimeWithLocalTzType()); public static final FieldType TIMESTAMP = FieldType.DATETIME; - public static final FieldType NULLABLE_TIMESTAMP = FieldType.DATETIME.withNullable(true); public static final FieldType TIMESTAMP_WITH_LOCAL_TZ = FieldType.logicalType(new TimestampWithLocalTzType()); @@ -153,12 +153,7 @@ public static boolean isStringType(FieldType fieldType) { .put(BOOLEAN, SqlTypeName.BOOLEAN) .put(VARBINARY, SqlTypeName.VARBINARY) .put(VARCHAR, SqlTypeName.VARCHAR) - .put(CHAR, SqlTypeName.CHAR) - .put(DATE, SqlTypeName.DATE) - .put(TIME, SqlTypeName.TIME) - .put(TIME_WITH_LOCAL_TZ, SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE) .put(TIMESTAMP, SqlTypeName.TIMESTAMP) - .put(TIMESTAMP_WITH_LOCAL_TZ, SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) .build(); private static final ImmutableMap CALCITE_TO_BEAM_TYPE_MAPPING = @@ -203,6 +198,42 @@ public static SqlTypeName toSqlTypeName(FieldType type) { return SqlTypeName.ARRAY; case MAP: return SqlTypeName.MAP; + case LOGICAL_TYPE: + { + String identifier = type.getLogicalType().getIdentifier(); + // As every logical type should have globally unique identifier, the identifier of a + // logical type can be used to determine SqlTypeName. + // By using the identifier of a logical type to determine the calcite type, the calcite + // SqlTypeName can be determined irrespective of the argument of the logical type. For + // example, varchar(10), varchar(100) both map to SqlTypeName.STRING irrespective of the + // max length. + switch (identifier) { + case CharType.IDENTIFIER: + case FixedLengthString.IDENTIFIER: + return SqlTypeName.CHAR; + case DateType.IDENTIFIER: + return SqlTypeName.DATE; + case TimeType.IDENTIFIER: + return SqlTypeName.TIME; + case TimeWithLocalTzType.IDENTIFIER: + return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE; + case TimestampWithLocalTzType.IDENTIFIER: + return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE; + case FixedBytes.IDENTIFIER: + return SqlTypeName.BINARY; + case VariableLengthBytes.IDENTIFIER: + return SqlTypeName.VARBINARY; + case VariableLengthString.IDENTIFIER: + return SqlTypeName.VARCHAR; + case LogicalDecimal.IDENTIFIER: + return SqlTypeName.DECIMAL; + default: + throw new IllegalArgumentException( + String.format( + "Cannot find a matching Calcite SqlTypeName for Beam logical type: %s", + type)); + } + } default: SqlTypeName typeName = BEAM_TO_CALCITE_TYPE_MAPPING.get(type.withNullable(false)); if (typeName != null) { diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java index 50b6ab29d3ac..14be946f8579 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java @@ -24,6 +24,11 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.schemas.logicaltypes.FixedLengthString; +import org.apache.beam.sdk.schemas.logicaltypes.LogicalDecimal; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthBytes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthString; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem; @@ -59,68 +64,121 @@ Map calciteRowTypeFields(Schema schema) { public void testToCalciteRowType() { final Schema schema = Schema.builder() - .addField("f1", Schema.FieldType.BYTE) - .addField("f2", Schema.FieldType.INT16) - .addField("f3", Schema.FieldType.INT32) - .addField("f4", Schema.FieldType.INT64) - .addField("f5", Schema.FieldType.FLOAT) - .addField("f6", Schema.FieldType.DOUBLE) - .addField("f7", Schema.FieldType.DECIMAL) - .addField("f8", Schema.FieldType.BOOLEAN) - .addField("f9", Schema.FieldType.BYTES) - .addField("f10", Schema.FieldType.STRING) + .addField("byte", Schema.FieldType.BYTE) + .addField("short", Schema.FieldType.INT16) + .addField("int", Schema.FieldType.INT32) + .addField("long", Schema.FieldType.INT64) + .addField("float", Schema.FieldType.FLOAT) + .addField("double", Schema.FieldType.DOUBLE) + .addField("decimal", Schema.FieldType.DECIMAL) + .addField("boolean", Schema.FieldType.BOOLEAN) + .addField("byteArray", Schema.FieldType.BYTES) + .addField("string", Schema.FieldType.STRING) + .addField("char", CalciteUtils.CHAR) + .addField("date", CalciteUtils.DATE) + .addField("time", CalciteUtils.TIME) + .addField("timestamp", CalciteUtils.TIMESTAMP) + .addField("timeWithLocalTZ", CalciteUtils.TIME_WITH_LOCAL_TZ) + .addField("timestampWithLocalTZ", CalciteUtils.TIMESTAMP_WITH_LOCAL_TZ) + .addField("fixedBytes", Schema.FieldType.logicalType(FixedBytes.of(10))) + .addField("variableBytes", Schema.FieldType.logicalType(VariableLengthBytes.of(100))) + .addField("fixedString", Schema.FieldType.logicalType(FixedLengthString.of(10))) + .addField("variableString", Schema.FieldType.logicalType(VariableLengthString.of(100))) + .addField("customDecimal", Schema.FieldType.logicalType(LogicalDecimal.of(10, 5))) .build(); final Map fields = calciteRowTypeFields(schema); - assertEquals(10, fields.size()); + assertEquals(21, fields.size()); fields.values().forEach(x -> assertFalse(x.isNullable())); - assertEquals(SqlTypeName.TINYINT, fields.get("f1").getSqlTypeName()); - assertEquals(SqlTypeName.SMALLINT, fields.get("f2").getSqlTypeName()); - assertEquals(SqlTypeName.INTEGER, fields.get("f3").getSqlTypeName()); - assertEquals(SqlTypeName.BIGINT, fields.get("f4").getSqlTypeName()); - assertEquals(SqlTypeName.FLOAT, fields.get("f5").getSqlTypeName()); - assertEquals(SqlTypeName.DOUBLE, fields.get("f6").getSqlTypeName()); - assertEquals(SqlTypeName.DECIMAL, fields.get("f7").getSqlTypeName()); - assertEquals(SqlTypeName.BOOLEAN, fields.get("f8").getSqlTypeName()); - assertEquals(SqlTypeName.VARBINARY, fields.get("f9").getSqlTypeName()); - assertEquals(SqlTypeName.VARCHAR, fields.get("f10").getSqlTypeName()); + assertEquals(SqlTypeName.TINYINT, fields.get("byte").getSqlTypeName()); + assertEquals(SqlTypeName.SMALLINT, fields.get("short").getSqlTypeName()); + assertEquals(SqlTypeName.INTEGER, fields.get("int").getSqlTypeName()); + assertEquals(SqlTypeName.BIGINT, fields.get("long").getSqlTypeName()); + assertEquals(SqlTypeName.FLOAT, fields.get("float").getSqlTypeName()); + assertEquals(SqlTypeName.DOUBLE, fields.get("double").getSqlTypeName()); + assertEquals(SqlTypeName.DECIMAL, fields.get("decimal").getSqlTypeName()); + assertEquals(SqlTypeName.BOOLEAN, fields.get("boolean").getSqlTypeName()); + assertEquals(SqlTypeName.VARBINARY, fields.get("byteArray").getSqlTypeName()); + assertEquals(SqlTypeName.VARCHAR, fields.get("string").getSqlTypeName()); + assertEquals(SqlTypeName.CHAR, fields.get("char").getSqlTypeName()); + assertEquals(SqlTypeName.DATE, fields.get("date").getSqlTypeName()); + assertEquals(SqlTypeName.TIME, fields.get("time").getSqlTypeName()); + assertEquals(SqlTypeName.TIMESTAMP, fields.get("timestamp").getSqlTypeName()); + assertEquals( + SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE, fields.get("timeWithLocalTZ").getSqlTypeName()); + assertEquals( + SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + fields.get("timestampWithLocalTZ").getSqlTypeName()); + assertEquals(SqlTypeName.BINARY, fields.get("fixedBytes").getSqlTypeName()); + assertEquals(SqlTypeName.VARBINARY, fields.get("variableBytes").getSqlTypeName()); + assertEquals(SqlTypeName.CHAR, fields.get("fixedString").getSqlTypeName()); + assertEquals(SqlTypeName.VARCHAR, fields.get("variableString").getSqlTypeName()); + assertEquals(SqlTypeName.DECIMAL, fields.get("customDecimal").getSqlTypeName()); } @Test public void testToCalciteRowTypeNullable() { final Schema schema = Schema.builder() - .addNullableField("f1", Schema.FieldType.BYTE) - .addNullableField("f2", Schema.FieldType.INT16) - .addNullableField("f3", Schema.FieldType.INT32) - .addNullableField("f4", Schema.FieldType.INT64) - .addNullableField("f5", Schema.FieldType.FLOAT) - .addNullableField("f6", Schema.FieldType.DOUBLE) - .addNullableField("f7", Schema.FieldType.DECIMAL) - .addNullableField("f8", Schema.FieldType.BOOLEAN) - .addNullableField("f9", Schema.FieldType.BYTES) - .addNullableField("f10", Schema.FieldType.STRING) + .addNullableField("byte", Schema.FieldType.BYTE) + .addNullableField("short", Schema.FieldType.INT16) + .addNullableField("int", Schema.FieldType.INT32) + .addNullableField("long", Schema.FieldType.INT64) + .addNullableField("float", Schema.FieldType.FLOAT) + .addNullableField("double", Schema.FieldType.DOUBLE) + .addNullableField("decimal", Schema.FieldType.DECIMAL) + .addNullableField("boolean", Schema.FieldType.BOOLEAN) + .addNullableField("byteArray", Schema.FieldType.BYTES) + .addNullableField("string", Schema.FieldType.STRING) + .addNullableField("char", CalciteUtils.CHAR) + .addNullableField("date", CalciteUtils.DATE) + .addNullableField("time", CalciteUtils.TIME) + .addNullableField("timestamp", CalciteUtils.TIMESTAMP) + .addNullableField("timeWithLocalTZ", CalciteUtils.TIME_WITH_LOCAL_TZ) + .addNullableField("timestampWithLocalTZ", CalciteUtils.TIMESTAMP_WITH_LOCAL_TZ) + .addNullableField("fixedBytes", Schema.FieldType.logicalType(FixedBytes.of(10))) + .addNullableField( + "variableBytes", Schema.FieldType.logicalType(VariableLengthBytes.of(100))) + .addNullableField("fixedString", Schema.FieldType.logicalType(FixedLengthString.of(10))) + .addNullableField( + "variableString", Schema.FieldType.logicalType(VariableLengthString.of(100))) + .addNullableField( + "customDecimal", Schema.FieldType.logicalType(LogicalDecimal.of(10, 5))) .build(); final Map fields = calciteRowTypeFields(schema); - assertEquals(10, fields.size()); + assertEquals(21, fields.size()); fields.values().forEach(x -> assertTrue(x.isNullable())); - assertEquals(SqlTypeName.TINYINT, fields.get("f1").getSqlTypeName()); - assertEquals(SqlTypeName.SMALLINT, fields.get("f2").getSqlTypeName()); - assertEquals(SqlTypeName.INTEGER, fields.get("f3").getSqlTypeName()); - assertEquals(SqlTypeName.BIGINT, fields.get("f4").getSqlTypeName()); - assertEquals(SqlTypeName.FLOAT, fields.get("f5").getSqlTypeName()); - assertEquals(SqlTypeName.DOUBLE, fields.get("f6").getSqlTypeName()); - assertEquals(SqlTypeName.DECIMAL, fields.get("f7").getSqlTypeName()); - assertEquals(SqlTypeName.BOOLEAN, fields.get("f8").getSqlTypeName()); - assertEquals(SqlTypeName.VARBINARY, fields.get("f9").getSqlTypeName()); - assertEquals(SqlTypeName.VARCHAR, fields.get("f10").getSqlTypeName()); + assertEquals(SqlTypeName.TINYINT, fields.get("byte").getSqlTypeName()); + assertEquals(SqlTypeName.SMALLINT, fields.get("short").getSqlTypeName()); + assertEquals(SqlTypeName.INTEGER, fields.get("int").getSqlTypeName()); + assertEquals(SqlTypeName.BIGINT, fields.get("long").getSqlTypeName()); + assertEquals(SqlTypeName.FLOAT, fields.get("float").getSqlTypeName()); + assertEquals(SqlTypeName.DOUBLE, fields.get("double").getSqlTypeName()); + assertEquals(SqlTypeName.DECIMAL, fields.get("decimal").getSqlTypeName()); + assertEquals(SqlTypeName.BOOLEAN, fields.get("boolean").getSqlTypeName()); + assertEquals(SqlTypeName.VARBINARY, fields.get("byteArray").getSqlTypeName()); + assertEquals(SqlTypeName.VARCHAR, fields.get("string").getSqlTypeName()); + assertEquals(SqlTypeName.CHAR, fields.get("char").getSqlTypeName()); + assertEquals(SqlTypeName.DATE, fields.get("date").getSqlTypeName()); + assertEquals(SqlTypeName.TIME, fields.get("time").getSqlTypeName()); + assertEquals(SqlTypeName.TIMESTAMP, fields.get("timestamp").getSqlTypeName()); + assertEquals( + SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE, fields.get("timeWithLocalTZ").getSqlTypeName()); + assertEquals( + SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + fields.get("timestampWithLocalTZ").getSqlTypeName()); + assertEquals(SqlTypeName.BINARY, fields.get("fixedBytes").getSqlTypeName()); + assertEquals(SqlTypeName.VARBINARY, fields.get("variableBytes").getSqlTypeName()); + assertEquals(SqlTypeName.CHAR, fields.get("fixedString").getSqlTypeName()); + assertEquals(SqlTypeName.VARCHAR, fields.get("variableString").getSqlTypeName()); + assertEquals(SqlTypeName.DECIMAL, fields.get("customDecimal").getSqlTypeName()); } @Test diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java index 045acd206fa4..773f6d9f6ba8 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.jdbc; import java.sql.Date; -import java.sql.JDBCType; import java.sql.Time; import java.sql.Timestamp; import java.util.Calendar; @@ -112,37 +111,40 @@ static JdbcIO.PreparedStatementSetCaller getPreparedStatementSetCaller( element.getArray(fieldWithIndex.getIndex()).toArray())); case LOGICAL_TYPE: { - String logicalTypeName = fieldType.getLogicalType().getIdentifier(); - JDBCType jdbcType = JDBCType.valueOf(logicalTypeName); - switch (jdbcType) { - case DATE: - return (element, ps, i, fieldWithIndex) -> - ps.setDate( - i + 1, - new Date( - getDateOrTimeOnly( - element.getDateTime(fieldWithIndex.getIndex()).toDateTime(), true) - .getTime() - .getTime())); - case TIME: - return (element, ps, i, fieldWithIndex) -> - ps.setTime( - i + 1, - new Time( - getDateOrTimeOnly( - element.getDateTime(fieldWithIndex.getIndex()).toDateTime(), - false) - .getTime() - .getTime())); - case TIMESTAMP_WITH_TIMEZONE: - return (element, ps, i, fieldWithIndex) -> { - Calendar calendar = - withTimestampAndTimezone( - element.getDateTime(fieldWithIndex.getIndex()).toDateTime()); - ps.setTimestamp(i + 1, new Timestamp(calendar.getTime().getTime()), calendar); - }; - default: - return getPreparedStatementSetCaller(fieldType.getLogicalType().getBaseType()); + String identifier = fieldType.getLogicalType().getIdentifier(); + if (LogicalTypes.JDBC_DATE_TYPE.getLogicalType().getIdentifier().equals(identifier)) { + return (element, ps, i, fieldWithIndex) -> + ps.setDate( + i + 1, + new Date( + getDateOrTimeOnly( + element.getDateTime(fieldWithIndex.getIndex()).toDateTime(), true) + .getTime() + .getTime())); + } else if (LogicalTypes.JDBC_TIME_TYPE + .getLogicalType() + .getIdentifier() + .equals(identifier)) { + return (element, ps, i, fieldWithIndex) -> + ps.setTime( + i + 1, + new Time( + getDateOrTimeOnly( + element.getDateTime(fieldWithIndex.getIndex()).toDateTime(), false) + .getTime() + .getTime())); + } else if (LogicalTypes.JDBC_TIMESTAMP_WITH_TIMEZONE_TYPE + .getLogicalType() + .getIdentifier() + .equals(identifier)) { + return (element, ps, i, fieldWithIndex) -> { + Calendar calendar = + withTimestampAndTimezone( + element.getDateTime(fieldWithIndex.getIndex()).toDateTime()); + ps.setTimestamp(i + 1, new Timestamp(calendar.getTime().getTime()), calendar); + }; + } else { + return getPreparedStatementSetCaller(fieldType.getLogicalType().getBaseType()); } } default: diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java index de6d236c98c1..946ca7f1ca26 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java @@ -17,40 +17,23 @@ */ package org.apache.beam.sdk.io.jdbc; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; - -import java.math.BigDecimal; import java.sql.JDBCType; import java.time.Instant; -import java.util.Arrays; -import java.util.Objects; -import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; /** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType} implementations of JDBC types. */ @Experimental(Kind.SCHEMAS) class LogicalTypes { - static final Schema.FieldType JDBC_BIT_TYPE = - Schema.FieldType.logicalType( - new PassThroughLogicalType( - JDBCType.BIT.getName(), FieldType.STRING, "", Schema.FieldType.BOOLEAN) {}); static final Schema.FieldType JDBC_DATE_TYPE = Schema.FieldType.logicalType( new PassThroughLogicalType( JDBCType.DATE.getName(), FieldType.STRING, "", Schema.FieldType.DATETIME) {}); - static final Schema.FieldType JDBC_FLOAT_TYPE = - Schema.FieldType.logicalType( - new PassThroughLogicalType( - JDBCType.FLOAT.getName(), FieldType.STRING, "", Schema.FieldType.DOUBLE) {}); - static final Schema.FieldType JDBC_TIME_TYPE = Schema.FieldType.logicalType( new PassThroughLogicalType( @@ -63,206 +46,4 @@ class LogicalTypes { FieldType.STRING, "", Schema.FieldType.DATETIME) {}); - - @VisibleForTesting - static Schema.FieldType fixedLengthString(JDBCType jdbcType, int length) { - return Schema.FieldType.logicalType(FixedLengthString.of(jdbcType.getName(), length)); - } - - @VisibleForTesting - static Schema.FieldType fixedLengthBytes(JDBCType jdbcType, int length) { - return Schema.FieldType.logicalType(FixedLengthBytes.of(jdbcType.getName(), length)); - } - - @VisibleForTesting - static Schema.FieldType variableLengthString(JDBCType jdbcType, int length) { - return Schema.FieldType.logicalType(VariableLengthString.of(jdbcType.getName(), length)); - } - - @VisibleForTesting - static Schema.FieldType variableLengthBytes(JDBCType jdbcType, int length) { - return Schema.FieldType.logicalType(VariableLengthBytes.of(jdbcType.getName(), length)); - } - - @VisibleForTesting - static Schema.FieldType numeric(int precision, int scale) { - return Schema.FieldType.logicalType( - FixedPrecisionNumeric.of(JDBCType.NUMERIC.getName(), precision, scale)); - } - - /** Base class for JDBC logical types. */ - abstract static class JdbcLogicalType implements Schema.LogicalType { - protected final String identifier; - protected final Schema.FieldType argumentType; - protected final Schema.FieldType baseType; - protected final Object argument; - - protected JdbcLogicalType( - String identifier, - Schema.FieldType argumentType, - Schema.FieldType baseType, - Object argument) { - this.identifier = identifier; - this.argumentType = argumentType; - this.baseType = baseType; - this.argument = argument; - } - - @Override - public String getIdentifier() { - return identifier; - } - - @Override - public FieldType getArgumentType() { - return argumentType; - } - - @Override - @SuppressWarnings("TypeParameterUnusedInFormals") - public ArgumentT getArgument() { - return (ArgumentT) argument; - } - - @Override - public Schema.FieldType getBaseType() { - return baseType; - } - - @Override - public T toBaseType(T input) { - return input; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof JdbcLogicalType)) { - return false; - } - JdbcLogicalType that = (JdbcLogicalType) o; - return Objects.equals(identifier, that.identifier) - && Objects.equals(baseType, that.baseType) - && Objects.equals(argument, that.argument); - } - - @Override - public int hashCode() { - return Objects.hash(identifier, baseType, argument); - } - } - - /** Fixed length string types such as CHAR. */ - static final class FixedLengthString extends JdbcLogicalType { - private final int length; - - static FixedLengthString of(String identifier, int length) { - return new FixedLengthString(identifier, length); - } - - private FixedLengthString(String identifier, int length) { - super(identifier, FieldType.INT32, Schema.FieldType.STRING, length); - this.length = length; - } - - @Override - public String toInputType(String base) { - checkArgument(base == null || base.length() <= length); - return StringUtils.rightPad(base, length); - } - } - - /** Fixed length byte types such as BINARY. */ - static final class FixedLengthBytes extends JdbcLogicalType { - private final int length; - - static FixedLengthBytes of(String identifier, int length) { - return new FixedLengthBytes(identifier, length); - } - - private FixedLengthBytes(String identifier, int length) { - super(identifier, FieldType.INT32, Schema.FieldType.BYTES, length); - this.length = length; - } - - @Override - public byte[] toInputType(byte[] base) { - checkArgument(base == null || base.length <= length); - if (base == null || base.length == length) { - return base; - } else { - return Arrays.copyOf(base, length); - } - } - } - - /** Variable length string types such as VARCHAR and LONGVARCHAR. */ - static final class VariableLengthString extends JdbcLogicalType { - private final int maxLength; - - static VariableLengthString of(String identifier, int maxLength) { - return new VariableLengthString(identifier, maxLength); - } - - private VariableLengthString(String identifier, int maxLength) { - super(identifier, FieldType.INT32, Schema.FieldType.STRING, maxLength); - this.maxLength = maxLength; - } - - @Override - public String toInputType(String base) { - checkArgument(base == null || base.length() <= maxLength); - return base; - } - } - - /** Variable length bytes types such as VARBINARY and LONGVARBINARY. */ - static final class VariableLengthBytes extends JdbcLogicalType { - private final int maxLength; - - static VariableLengthBytes of(String identifier, int maxLength) { - return new VariableLengthBytes(identifier, maxLength); - } - - private VariableLengthBytes(String identifier, int maxLength) { - super(identifier, FieldType.INT32, Schema.FieldType.BYTES, maxLength); - this.maxLength = maxLength; - } - - @Override - public byte[] toInputType(byte[] base) { - checkArgument(base == null || base.length <= maxLength); - return base; - } - } - - /** Fixed precision numeric types such as NUMERIC. */ - static final class FixedPrecisionNumeric extends JdbcLogicalType { - private final int precision; - private final int scale; - - static FixedPrecisionNumeric of(String identifier, int precision, int scale) { - Schema schema = Schema.builder().addInt32Field("precision").addInt32Field("scale").build(); - return new FixedPrecisionNumeric(schema, identifier, precision, scale); - } - - private FixedPrecisionNumeric( - Schema argumentSchema, String identifier, int precision, int scale) { - super( - identifier, - FieldType.row(argumentSchema), - Schema.FieldType.DECIMAL, - Row.withSchema(argumentSchema).addValues(precision, scale).build()); - this.precision = precision; - this.scale = scale; - } - - @Override - public BigDecimal toInputType(BigDecimal base) { - checkArgument(base == null || (base.precision() == precision && base.scale() == scale)); - return base; - } - } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java index a30bfdd6e0c2..6efeaa9983f1 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java @@ -17,16 +17,6 @@ */ package org.apache.beam.sdk.io.jdbc; -import static java.sql.JDBCType.BINARY; -import static java.sql.JDBCType.CHAR; -import static java.sql.JDBCType.LONGNVARCHAR; -import static java.sql.JDBCType.LONGVARBINARY; -import static java.sql.JDBCType.LONGVARCHAR; -import static java.sql.JDBCType.NCHAR; -import static java.sql.JDBCType.NUMERIC; -import static java.sql.JDBCType.NVARCHAR; -import static java.sql.JDBCType.VARBINARY; -import static java.sql.JDBCType.VARCHAR; import static java.sql.JDBCType.valueOf; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; @@ -47,12 +37,17 @@ import java.util.EnumMap; import java.util.List; import java.util.TimeZone; -import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.schemas.logicaltypes.FixedLengthString; +import org.apache.beam.sdk.schemas.logicaltypes.LogicalDecimal; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthBytes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthString; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.joda.time.DateTime; @@ -109,35 +104,33 @@ private static BeamFieldConverter jdbcTypeToBeamFieldConverter(JDBCType jdbcType case BIGINT: return beamFieldOfType(Schema.FieldType.INT64); case BINARY: - return beamLogicalField(BINARY.getName(), LogicalTypes.FixedLengthBytes::of); + return beamLogicalField(FixedBytes::of); case BIT: - return beamFieldOfType(LogicalTypes.JDBC_BIT_TYPE); case BOOLEAN: return beamFieldOfType(Schema.FieldType.BOOLEAN); case CHAR: - return beamLogicalField(CHAR.getName(), LogicalTypes.FixedLengthString::of); + case NCHAR: + return beamLogicalField(FixedLengthString::of); case DATE: return beamFieldOfType(LogicalTypes.JDBC_DATE_TYPE); case DECIMAL: return beamFieldOfType(Schema.FieldType.DECIMAL); case DOUBLE: - return beamFieldOfType(Schema.FieldType.DOUBLE); case FLOAT: - return beamFieldOfType(LogicalTypes.JDBC_FLOAT_TYPE); + // JDBCType.FLOAT maps to double Java Type as per JDBC 4.3 spec TABLE B-1. + return beamFieldOfType(Schema.FieldType.DOUBLE); case INTEGER: return beamFieldOfType(Schema.FieldType.INT32); case LONGNVARCHAR: - return beamLogicalField(LONGNVARCHAR.getName(), LogicalTypes.VariableLengthString::of); - case LONGVARBINARY: - return beamLogicalField(LONGVARBINARY.getName(), LogicalTypes.VariableLengthBytes::of); case LONGVARCHAR: - return beamLogicalField(LONGVARCHAR.getName(), LogicalTypes.VariableLengthString::of); - case NCHAR: - return beamLogicalField(NCHAR.getName(), LogicalTypes.FixedLengthString::of); - case NUMERIC: - return beamLogicalNumericField(NUMERIC.getName()); case NVARCHAR: - return beamLogicalField(NVARCHAR.getName(), LogicalTypes.VariableLengthString::of); + case VARCHAR: + return beamLogicalField(VariableLengthString::of); + case LONGVARBINARY: + case VARBINARY: + return beamLogicalField(VariableLengthBytes::of); + case NUMERIC: + return beamLogicalNumericField(); case REAL: return beamFieldOfType(Schema.FieldType.FLOAT); case SMALLINT: @@ -150,10 +143,6 @@ private static BeamFieldConverter jdbcTypeToBeamFieldConverter(JDBCType jdbcType return beamFieldOfType(LogicalTypes.JDBC_TIMESTAMP_WITH_TIMEZONE_TYPE); case TINYINT: return beamFieldOfType(Schema.FieldType.BYTE); - case VARBINARY: - return beamLogicalField(VARBINARY.getName(), LogicalTypes.VariableLengthBytes::of); - case VARCHAR: - return beamLogicalField(VARCHAR.getName(), LogicalTypes.VariableLengthString::of); default: throw new UnsupportedOperationException( "Converting " + jdbcType + " to Beam schema type is not supported"); @@ -184,24 +173,21 @@ private static BeamFieldConverter beamFieldOfType(Schema.FieldType fieldType) { /** Converts logical types with arguments such as VARCHAR(25). */ private static BeamFieldConverter beamLogicalField( - String identifier, - BiFunction> constructor) { + Function> constructor) { return (index, md) -> { int size = md.getPrecision(index); - Schema.FieldType fieldType = - Schema.FieldType.logicalType(constructor.apply(identifier, size)); + Schema.FieldType fieldType = Schema.FieldType.logicalType(constructor.apply(size)); return beamFieldOfType(fieldType).create(index, md); }; } /** Converts numeric fields with specified precision and scale. */ - private static BeamFieldConverter beamLogicalNumericField(String identifier) { + private static BeamFieldConverter beamLogicalNumericField() { return (index, md) -> { int precision = md.getPrecision(index); int scale = md.getScale(index); Schema.FieldType fieldType = - Schema.FieldType.logicalType( - LogicalTypes.FixedPrecisionNumeric.of(identifier, precision, scale)); + Schema.FieldType.logicalType(LogicalDecimal.of(precision, scale)); return beamFieldOfType(fieldType).create(index, md); }; } @@ -262,18 +248,19 @@ private static ResultSetFieldExtractor createArrayExtractor( /** Creates a {@link ResultSetFieldExtractor} for logical types. */ private static ResultSetFieldExtractor createLogicalTypeExtractor( final Schema.LogicalType fieldType) { - String logicalTypeName = fieldType.getIdentifier(); - JDBCType underlyingType = JDBCType.valueOf(logicalTypeName); - switch (underlyingType) { - case DATE: - return DATE_EXTRACTOR; - case TIME: - return TIME_EXTRACTOR; - case TIMESTAMP_WITH_TIMEZONE: - return TIMESTAMP_EXTRACTOR; - default: - ResultSetFieldExtractor extractor = createFieldExtractor(fieldType.getBaseType()); - return (rs, index) -> fieldType.toInputType((BaseT) extractor.extract(rs, index)); + String identifier = fieldType.getIdentifier(); + if (LogicalTypes.JDBC_DATE_TYPE.getLogicalType().getIdentifier().equals(identifier)) { + return DATE_EXTRACTOR; + } else if (LogicalTypes.JDBC_TIME_TYPE.getLogicalType().getIdentifier().equals(identifier)) { + return TIME_EXTRACTOR; + } else if (LogicalTypes.JDBC_TIMESTAMP_WITH_TIMEZONE_TYPE + .getLogicalType() + .getIdentifier() + .equals(identifier)) { + return TIMESTAMP_EXTRACTOR; + } else { + ResultSetFieldExtractor extractor = createFieldExtractor(fieldType.getBaseType()); + return (rs, index) -> fieldType.toInputType((BaseT) extractor.extract(rs, index)); } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 72985cb85369..37c3dab2b822 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -37,7 +37,6 @@ import java.sql.Array; import java.sql.Connection; import java.sql.Date; -import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -60,6 +59,7 @@ import org.apache.beam.sdk.io.common.TestRow; import org.apache.beam.sdk.io.jdbc.JdbcIO.PoolableDataSourceProvider; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthString; import org.apache.beam.sdk.schemas.transforms.Select; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.PAssert; @@ -336,7 +336,7 @@ public void testReadRowsWithDataSourceConfiguration() { Schema expectedSchema = Schema.of( - Schema.Field.of("NAME", LogicalTypes.variableLengthString(JDBCType.VARCHAR, 500)) + Schema.Field.of("NAME", Schema.FieldType.logicalType(VariableLengthString.of(500))) .withNullable(true), Schema.Field.of("ID", Schema.FieldType.INT32).withNullable(true)); @@ -364,7 +364,7 @@ public void testReadRowsWithoutStatementPreparator() { Schema expectedSchema = Schema.of( - Schema.Field.of("NAME", LogicalTypes.variableLengthString(JDBCType.VARCHAR, 500)) + Schema.Field.of("NAME", Schema.FieldType.logicalType(VariableLengthString.of(500))) .withNullable(true), Schema.Field.of("ID", Schema.FieldType.INT32).withNullable(true)); diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java index 7ec9e7bc8e83..824748e737d4 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java @@ -37,6 +37,11 @@ import java.sql.Timestamp; import java.sql.Types; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.schemas.logicaltypes.FixedLengthString; +import org.apache.beam.sdk.schemas.logicaltypes.LogicalDecimal; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthBytes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthString; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.joda.time.DateTime; @@ -98,32 +103,32 @@ public void testToBeamSchema() throws SQLException { Schema.builder() .addArrayField("int_array_col", Schema.FieldType.INT32) .addField("bigint_col", Schema.FieldType.INT64) - .addField("binary_col", LogicalTypes.fixedLengthBytes(JDBCType.BINARY, 255)) - .addField("bit_col", LogicalTypes.JDBC_BIT_TYPE) + .addField("binary_col", Schema.FieldType.logicalType(FixedBytes.of(255))) + .addField("bit_col", Schema.FieldType.BOOLEAN) .addField("boolean_col", Schema.FieldType.BOOLEAN) - .addField("char_col", LogicalTypes.fixedLengthString(JDBCType.CHAR, 255)) + .addField("char_col", Schema.FieldType.logicalType(FixedLengthString.of(255))) .addField("date_col", LogicalTypes.JDBC_DATE_TYPE) .addField("decimal_col", Schema.FieldType.DECIMAL) .addField("double_col", Schema.FieldType.DOUBLE) - .addField("float_col", LogicalTypes.JDBC_FLOAT_TYPE) + .addField("float_col", Schema.FieldType.DOUBLE) .addField("integer_col", Schema.FieldType.INT32) .addField( - "longnvarchar_col", LogicalTypes.variableLengthString(JDBCType.LONGNVARCHAR, 1024)) + "longnvarchar_col", Schema.FieldType.logicalType(VariableLengthString.of(1024))) .addField( - "longvarchar_col", LogicalTypes.variableLengthString(JDBCType.LONGVARCHAR, 1024)) + "longvarchar_col", Schema.FieldType.logicalType(VariableLengthString.of(1024))) .addField( - "longvarbinary_col", LogicalTypes.variableLengthBytes(JDBCType.LONGVARBINARY, 1024)) - .addField("nchar_col", LogicalTypes.fixedLengthString(JDBCType.NCHAR, 255)) - .addField("numeric_col", LogicalTypes.numeric(12, 4)) - .addField("nvarchar_col", LogicalTypes.variableLengthString(JDBCType.NVARCHAR, 255)) + "longvarbinary_col", Schema.FieldType.logicalType(VariableLengthBytes.of(1024))) + .addField("nchar_col", Schema.FieldType.logicalType(FixedLengthString.of(255))) + .addField("numeric_col", Schema.FieldType.logicalType(LogicalDecimal.of(12, 4))) + .addField("nvarchar_col", Schema.FieldType.logicalType(VariableLengthString.of(255))) .addField("real_col", Schema.FieldType.FLOAT) .addField("smallint_col", Schema.FieldType.INT16) .addField("time_col", LogicalTypes.JDBC_TIME_TYPE) .addField("timestamp_col", Schema.FieldType.DATETIME) .addField("timestamptz_col", LogicalTypes.JDBC_TIMESTAMP_WITH_TIMEZONE_TYPE) .addField("tinyint_col", Schema.FieldType.BYTE) - .addField("varbinary_col", LogicalTypes.variableLengthBytes(JDBCType.VARBINARY, 255)) - .addField("varchar_col", LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255)) + .addField("varbinary_col", Schema.FieldType.logicalType(VariableLengthBytes.of(255))) + .addField("varchar_col", Schema.FieldType.logicalType(VariableLengthString.of(255))) .build(); Schema haveBeamSchema = SchemaUtil.toBeamSchema(mockResultSetMetaData); @@ -326,20 +331,20 @@ public void testSchemaFieldTypeComparator() { assertFalse(SchemaUtil.compareSchemaFieldType(Schema.FieldType.STRING, Schema.FieldType.INT16)); assertTrue( SchemaUtil.compareSchemaFieldType( - LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255), - LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255))); + Schema.FieldType.logicalType(VariableLengthString.of(255)), + Schema.FieldType.logicalType(VariableLengthString.of(255)))); assertFalse( SchemaUtil.compareSchemaFieldType( - LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255), - LogicalTypes.fixedLengthBytes(JDBCType.BIT, 255))); + Schema.FieldType.logicalType(VariableLengthString.of(255)), + Schema.FieldType.logicalType(FixedBytes.of(255)))); assertTrue( SchemaUtil.compareSchemaFieldType( - Schema.FieldType.STRING, LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255))); + Schema.FieldType.STRING, Schema.FieldType.logicalType(VariableLengthString.of(255)))); assertFalse( SchemaUtil.compareSchemaFieldType( - Schema.FieldType.INT16, LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255))); + Schema.FieldType.INT16, Schema.FieldType.logicalType(VariableLengthString.of(255)))); assertTrue( SchemaUtil.compareSchemaFieldType( - LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255), Schema.FieldType.STRING)); + Schema.FieldType.logicalType(VariableLengthString.of(255)), Schema.FieldType.STRING)); } } From 87e4ebcd7ef6d713697829cc5911b5adf078dd16 Mon Sep 17 00:00:00 2001 From: rahul8383 Date: Thu, 14 May 2020 09:15:44 +0530 Subject: [PATCH 2/2] Add Test for Logcial Types which exercises coder --- .../schemas/logicaltypes/LogicalDecimal.java | 2 +- .../apache/beam/sdk/coders/RowCoderTest.java | 36 +++++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalDecimal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalDecimal.java index 14384559dcd8..558ce79a347c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalDecimal.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalDecimal.java @@ -38,7 +38,7 @@ private LogicalDecimal(int precision, int scale) { IDENTIFIER, Schema.FieldType.row(schema), Row.withSchema(schema).addValues(precision, scale).build(), - Schema.FieldType.STRING); + Schema.FieldType.DECIMAL); this.precision = precision; this.scale = scale; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java index 79202869bcaf..190da6738c5e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java @@ -28,6 +28,11 @@ import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType.Value; +import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.schemas.logicaltypes.FixedLengthString; +import org.apache.beam.sdk.schemas.logicaltypes.LogicalDecimal; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthBytes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableLengthString; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -164,8 +169,35 @@ public void testIterableOfIterable() throws Exception { @Test public void testLogicalType() throws Exception { EnumerationType enumeration = EnumerationType.create("one", "two", "three"); - Schema schema = Schema.builder().addLogicalTypeField("f_enum", enumeration).build(); - Row row = Row.withSchema(schema).addValue(enumeration.valueOf("two")).build(); + FixedBytes fixedBytes = FixedBytes.of(5); + VariableLengthBytes variableLengthBytes = VariableLengthBytes.of(10); + FixedLengthString fixedLengthString = FixedLengthString.of(5); + VariableLengthString variableLengthString = VariableLengthString.of(10); + LogicalDecimal logicalDecimal = LogicalDecimal.of(10, 5); + + Schema schema = + Schema.builder() + .addLogicalTypeField("f_enum", enumeration) + .addLogicalTypeField("f_fixed_bytes", fixedBytes) + .addLogicalTypeField("f_variable_bytes", variableLengthBytes) + .addLogicalTypeField("f_fixed_string", fixedLengthString) + .addLogicalTypeField("f_variable_string", variableLengthString) + .addLogicalTypeField("f_logical_decimal", logicalDecimal) + .build(); + + byte[] bytes = {1, 2, 3, 4, 5}; + String string = "fixed"; + BigDecimal customDecimal = new BigDecimal("12345.12345"); + + Row row = + Row.withSchema(schema) + .addValue(enumeration.valueOf("two")) + .addValue(bytes) + .addValue(bytes) + .addValue(string) + .addValue(string) + .addValue(customDecimal) + .build(); CoderProperties.coderDecodeEncodeEqual(RowCoder.of(schema), row); }