From 60e465e40d413ea6fb8c113ff5619c30be5864b0 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 7 Nov 2022 11:52:17 -0500 Subject: [PATCH 1/3] Eliminate CalciteUtils.CharType logical type * Replace CalciteUtils.CharType to String Note that CalciteUtils still omits the precision of BINARY/VARBINARY/CHAR/VARCHAR as what it originally did. Support of the precision of these calcite types involves make use of making use of the overload method RelDataTypeFactory.createSqlType(var1, var2). * Replace every reference of CalciteUtil.CharType to generic PassThroughLogicalType check --- .../org/apache/beam/sdk/util/RowJsonTest.java | 12 +++------ .../extensions/sql/impl/rel/BeamCalcRel.java | 25 ++++++++----------- .../sql/impl/rel/BeamEnumerableConverter.java | 9 ++++--- .../sql/impl/utils/CalciteUtils.java | 19 +++----------- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 15 +++++++---- .../sdk/io/gcp/bigtable/CellValueParser.java | 11 +++++--- 6 files changed, 42 insertions(+), 49 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java index 229268830b86..9509d2a10769 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java @@ -32,7 +32,7 @@ import java.util.Collection; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.FixedString; import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer; import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer.NullBehavior; import org.apache.beam.sdk.util.RowJson.RowJsonSerializer; @@ -132,14 +132,10 @@ private static Object[] makeFlatRowTestCase() { private static Object[] makeLogicalTypeTestCase() { Schema schema = - Schema.builder() - .addLogicalTypeField( - "f_passThroughString", - new PassThroughLogicalType( - "SqlCharType", FieldType.STRING, "", FieldType.STRING) {}) - .build(); + Schema.builder().addLogicalTypeField("f_passThroughString", FixedString.of(10)).build(); - String rowString = "{\n" + "\"f_passThroughString\" : \"hello\"\n" + "}"; + // fixed string will do padding + String rowString = "{\n" + "\"f_passThroughString\" : \"hello \"\n" + "}"; Row expectedRow = Row.withSchema(schema).addValues("hello").build(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 368bf8dc4645..7ef55ffef7b8 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -49,12 +49,13 @@ import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType; import org.apache.beam.sdk.schemas.FieldAccessDescriptor; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.FixedString; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; @@ -407,15 +408,10 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu } return toBeamRow((List) value, fieldType.getRowSchema(), verifyValues); case LOGICAL_TYPE: - String identifier = fieldType.getLogicalType().getIdentifier(); - if (CharType.IDENTIFIER.equals(identifier) - || FixedString.IDENTIFIER.equals(identifier) - || VariableString.IDENTIFIER.equals(identifier)) { - return (String) value; - } else if (FixedBytes.IDENTIFIER.equals(identifier) - || VariableBytes.IDENTIFIER.equals(identifier)) { - return (byte[]) value; - } else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { + LogicalType logicalType = fieldType.getLogicalType(); + assert logicalType != null; + String identifier = logicalType.getIdentifier(); + if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { return Instant.ofEpochMilli(((Number) value).longValue()); } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { if (value instanceof Date) { @@ -440,6 +436,9 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu LocalTime.ofNanoOfDay( (((Number) value).longValue() % MILLIS_PER_DAY) * NANOS_PER_MILLISECOND)); } else { + if (logicalType instanceof PassThroughLogicalType) { + return toBeamObject(value, logicalType.getBaseType(), verifyValues); + } throw new UnsupportedOperationException("Unable to convert logical type " + identifier); } default: @@ -561,8 +560,7 @@ private static Expression getBeamField( break; case LOGICAL_TYPE: String identifier = fieldType.getLogicalType().getIdentifier(); - if (CharType.IDENTIFIER.equals(identifier) - || FixedString.IDENTIFIER.equals(identifier) + if (FixedString.IDENTIFIER.equals(identifier) || VariableString.IDENTIFIER.equals(identifier)) { value = Expressions.call(expression, "getString", fieldName); } else if (FixedBytes.IDENTIFIER.equals(identifier) @@ -643,8 +641,7 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) return nullOr(value, toCalciteRow(value, fieldType.getRowSchema())); case LOGICAL_TYPE: String identifier = fieldType.getLogicalType().getIdentifier(); - if (CharType.IDENTIFIER.equals(identifier) - || FixedString.IDENTIFIER.equals(identifier) + if (FixedString.IDENTIFIER.equals(identifier) || VariableString.IDENTIFIER.equals(identifier)) { return Expressions.convert_(value, String.class); } else if (FixedBytes.IDENTIFIER.equals(identifier) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java index 23281ac1261e..64c3fe300023 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.MetricNameFilter; @@ -50,6 +49,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.LogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -318,7 +319,9 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) { switch (type.getTypeName()) { case LOGICAL_TYPE: - String logicalId = type.getLogicalType().getIdentifier(); + LogicalType logicalType = type.getLogicalType(); + assert logicalType != null; + String logicalId = logicalType.getIdentifier(); if (SqlTypes.TIME.getIdentifier().equals(logicalId)) { if (beamValue instanceof Long) { // base type return (Long) beamValue; @@ -331,7 +334,7 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) { } else { // input type return (int) ((LocalDate) beamValue).toEpochDay(); } - } else if (CharType.IDENTIFIER.equals(logicalId)) { + } else if (logicalType instanceof PassThroughLogicalType) { return beamValue; } else { throw new UnsupportedOperationException("Unknown DateTime type " + logicalId); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index 4f8d57a4fbc5..126bccf52604 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -57,15 +57,6 @@ public TimeWithLocalTzType() { } } - /** A LogicalType corresponding to CHAR. */ - public static class CharType extends PassThroughLogicalType { - public static final String IDENTIFIER = "SqlCharType"; - - public CharType() { - super(IDENTIFIER, FieldType.STRING, "", FieldType.STRING); - } - } - /** Returns true if the type is any of the various date time types. */ public static boolean isDateTimeType(FieldType fieldType) { if (fieldType.getTypeName() == TypeName.DATETIME) { @@ -90,10 +81,9 @@ public static boolean isStringType(FieldType fieldType) { } if (fieldType.getTypeName().isLogicalType()) { - Schema.LogicalType logicalType = fieldType.getLogicalType(); - Preconditions.checkArgumentNotNull(logicalType); - String logicalId = logicalType.getIdentifier(); - return logicalId.equals(CharType.IDENTIFIER); + Schema.LogicalType logicalType = fieldType.getLogicalType(); + return logicalType instanceof PassThroughLogicalType + && logicalType.getBaseType().getTypeName() == TypeName.STRING; } return false; } @@ -109,7 +99,7 @@ public static boolean isStringType(FieldType fieldType) { public static final FieldType BOOLEAN = FieldType.BOOLEAN; public static final FieldType VARBINARY = FieldType.BYTES; public static final FieldType VARCHAR = FieldType.STRING; - public static final FieldType CHAR = FieldType.logicalType(new CharType()); + public static final FieldType CHAR = FieldType.STRING; public static final FieldType DATE = FieldType.logicalType(SqlTypes.DATE); public static final FieldType NULLABLE_DATE = FieldType.logicalType(SqlTypes.DATE).withNullable(true); @@ -136,7 +126,6 @@ public static boolean isStringType(FieldType fieldType) { .put(BOOLEAN, SqlTypeName.BOOLEAN) .put(VARBINARY, SqlTypeName.VARBINARY) .put(VARCHAR, SqlTypeName.VARCHAR) - .put(CHAR, SqlTypeName.CHAR) .put(DATE, SqlTypeName.DATE) .put(TIME, SqlTypeName.TIME) .put(TIME_WITH_LOCAL_TZ, SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index e152beb623d7..6a340496122b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -61,8 +61,10 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; @@ -262,7 +264,6 @@ public abstract static class Builder { .put(SqlTypes.TIME.getIdentifier(), StandardSQLTypeName.TIME) .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME) .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME) - .put("SqlCharType", StandardSQLTypeName.STRING) .put("Enum", StandardSQLTypeName.STRING) .build(); @@ -280,6 +281,9 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) { Preconditions.checkArgumentNotNull(fieldType.getLogicalType()); ret = BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(logicalType.getIdentifier()); if (ret == null) { + if (logicalType instanceof PassThroughLogicalType) { + return toStandardSQLTypeName(logicalType.getBaseType()); + } throw new IllegalArgumentException( "Cannot convert Beam logical type: " + logicalType.getIdentifier() @@ -718,7 +722,6 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso // TODO: BigQuery shouldn't know about SQL internal logical types. private static final Set SQL_DATE_TIME_TYPES = ImmutableSet.of("SqlTimeWithLocalTzType"); - private static final Set SQL_STRING_TYPES = ImmutableSet.of("SqlCharType"); /** * Tries to convert an Avro decoded value to a Beam field value based on the target type of the @@ -766,7 +769,9 @@ public static Object convertAvroFormat( case ARRAY: return convertAvroArray(beamFieldType, avroValue, options); case LOGICAL_TYPE: - String identifier = beamFieldType.getLogicalType().getIdentifier(); + LogicalType logicalType = beamFieldType.getLogicalType(); + assert logicalType != null; + String identifier = logicalType.getIdentifier(); if (SqlTypes.DATE.getIdentifier().equals(identifier)) { return convertAvroDate(avroValue); } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) { @@ -784,8 +789,8 @@ public static Object convertAvroFormat( String.format( "Unknown timestamp truncation option: %s", options.getTruncateTimestamps())); } - } else if (SQL_STRING_TYPES.contains(identifier)) { - return convertAvroPrimitiveTypes(TypeName.STRING, avroValue); + } else if (logicalType instanceof PassThroughLogicalType) { + return convertAvroFormat(logicalType.getBaseType(), avroValue, options); } else { throw new RuntimeException("Unknown logical type " + identifier); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParser.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParser.java index 1560fccadf2e..3b1ff015d7e7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParser.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParser.java @@ -25,6 +25,8 @@ import com.google.protobuf.ByteString; import java.io.Serializable; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.LogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Shorts; @@ -95,11 +97,12 @@ ByteString valueToByteString(Object value, Schema.FieldType type) { case DATETIME: return byteString(value.toString().getBytes(UTF_8)); case LOGICAL_TYPE: - String identifier = checkArgumentNotNull(type.getLogicalType()).getIdentifier(); - if ("SqlCharType".equals(identifier)) { - return byteString(((String) value).getBytes(UTF_8)); + LogicalType logicalType = checkArgumentNotNull(type.getLogicalType()); + if (logicalType instanceof PassThroughLogicalType) { + return valueToByteString(value, logicalType.getBaseType()); } else { - throw new IllegalStateException("Unsupported logical type: " + identifier); + throw new IllegalStateException( + "Unsupported logical type: " + logicalType.getIdentifier()); } default: throw new IllegalStateException("Unsupported type: " + type.getTypeName()); From 32590501f106d194d5f22841b841213863239523 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 7 Nov 2022 16:16:26 -0500 Subject: [PATCH 2/3] Add TODO to Support sql types with arguments --- .../beam/sdk/extensions/sql/impl/utils/CalciteUtils.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index 126bccf52604..153acce03e1b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -97,6 +97,7 @@ public static boolean isStringType(FieldType fieldType) { public static final FieldType DOUBLE = FieldType.DOUBLE; public static final FieldType DECIMAL = FieldType.DECIMAL; public static final FieldType BOOLEAN = FieldType.BOOLEAN; + // TODO(https://github.com/apache/beam/issues/24019) Support sql types with arguments public static final FieldType VARBINARY = FieldType.BYTES; public static final FieldType VARCHAR = FieldType.STRING; public static final FieldType CHAR = FieldType.STRING; @@ -143,6 +144,9 @@ public static boolean isStringType(FieldType fieldType) { .put(SqlTypeName.DOUBLE, DOUBLE) .put(SqlTypeName.DECIMAL, DECIMAL) .put(SqlTypeName.BOOLEAN, BOOLEAN) + // TODO(https://github.com/apache/beam/issues/24019) Support sql types with arguments + // Handle Calcite VARBINARY/BINARY/VARCHAR/CHAR with + // VariableBinary/FixedBinary/VariableString/FixedString logical types. .put(SqlTypeName.VARBINARY, VARBINARY) .put(SqlTypeName.BINARY, VARBINARY) .put(SqlTypeName.VARCHAR, VARCHAR) From 48b842bbecd9caf890f0be511617aadc50976106 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 7 Nov 2022 16:20:26 -0500 Subject: [PATCH 3/3] Use VariableString in LogicalTypeTestCase --- .../test/java/org/apache/beam/sdk/util/RowJsonTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java index 9509d2a10769..b8b8efa457d1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java @@ -32,7 +32,7 @@ import java.util.Collection; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.logicaltypes.FixedString; +import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer; import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer.NullBehavior; import org.apache.beam.sdk.util.RowJson.RowJsonSerializer; @@ -132,10 +132,12 @@ private static Object[] makeFlatRowTestCase() { private static Object[] makeLogicalTypeTestCase() { Schema schema = - Schema.builder().addLogicalTypeField("f_passThroughString", FixedString.of(10)).build(); + Schema.builder() + .addLogicalTypeField("f_passThroughString", VariableString.of(10)) + .build(); // fixed string will do padding - String rowString = "{\n" + "\"f_passThroughString\" : \"hello \"\n" + "}"; + String rowString = "{\n" + "\"f_passThroughString\" : \"hello\"\n" + "}"; Row expectedRow = Row.withSchema(schema).addValues("hello").build();