Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Collection;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer;
import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer.NullBehavior;
import org.apache.beam.sdk.util.RowJson.RowJsonSerializer;
Expand Down Expand Up @@ -133,12 +133,10 @@ private static Object[] makeFlatRowTestCase() {
private static Object[] makeLogicalTypeTestCase() {
Schema schema =
Schema.builder()
.addLogicalTypeField(
"f_passThroughString",
new PassThroughLogicalType<String>(
"SqlCharType", FieldType.STRING, "", FieldType.STRING) {})
.addLogicalTypeField("f_passThroughString", VariableString.of(10))
.build();

// fixed string will do padding
String rowString = "{\n" + "\"f_passThroughString\" : \"hello\"\n" + "}";

Row expectedRow = Row.withSchema(schema).addValues("hello").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@
import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
Expand Down Expand Up @@ -407,15 +408,10 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu
}
return toBeamRow((List<Object>) value, fieldType.getRowSchema(), verifyValues);
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (CharType.IDENTIFIER.equals(identifier)
|| FixedString.IDENTIFIER.equals(identifier)
|| VariableString.IDENTIFIER.equals(identifier)) {
return (String) value;
} else if (FixedBytes.IDENTIFIER.equals(identifier)
|| VariableBytes.IDENTIFIER.equals(identifier)) {
return (byte[]) value;
} else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
LogicalType<?, ?> logicalType = fieldType.getLogicalType();
assert logicalType != null;
String identifier = logicalType.getIdentifier();
if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
return Instant.ofEpochMilli(((Number) value).longValue());
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
if (value instanceof Date) {
Expand All @@ -440,6 +436,9 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu
LocalTime.ofNanoOfDay(
(((Number) value).longValue() % MILLIS_PER_DAY) * NANOS_PER_MILLISECOND));
} else {
if (logicalType instanceof PassThroughLogicalType) {
return toBeamObject(value, logicalType.getBaseType(), verifyValues);
}
throw new UnsupportedOperationException("Unable to convert logical type " + identifier);
}
default:
Expand Down Expand Up @@ -561,8 +560,7 @@ private static Expression getBeamField(
break;
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (CharType.IDENTIFIER.equals(identifier)
|| FixedString.IDENTIFIER.equals(identifier)
if (FixedString.IDENTIFIER.equals(identifier)
|| VariableString.IDENTIFIER.equals(identifier)) {
value = Expressions.call(expression, "getString", fieldName);
} else if (FixedBytes.IDENTIFIER.equals(identifier)
Expand Down Expand Up @@ -643,8 +641,7 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType)
return nullOr(value, toCalciteRow(value, fieldType.getRowSchema()));
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (CharType.IDENTIFIER.equals(identifier)
|| FixedString.IDENTIFIER.equals(identifier)
if (FixedString.IDENTIFIER.equals(identifier)
|| VariableString.IDENTIFIER.equals(identifier)) {
return Expressions.convert_(value, String.class);
} else if (FixedBytes.IDENTIFIER.equals(identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricNameFilter;
Expand All @@ -50,6 +49,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -318,7 +319,9 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {

switch (type.getTypeName()) {
case LOGICAL_TYPE:
String logicalId = type.getLogicalType().getIdentifier();
LogicalType<?, ?> logicalType = type.getLogicalType();
assert logicalType != null;
String logicalId = logicalType.getIdentifier();
if (SqlTypes.TIME.getIdentifier().equals(logicalId)) {
if (beamValue instanceof Long) { // base type
return (Long) beamValue;
Expand All @@ -331,7 +334,7 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {
} else { // input type
return (int) ((LocalDate) beamValue).toEpochDay();
}
} else if (CharType.IDENTIFIER.equals(logicalId)) {
} else if (logicalType instanceof PassThroughLogicalType) {
return beamValue;
} else {
throw new UnsupportedOperationException("Unknown DateTime type " + logicalId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ public TimeWithLocalTzType() {
}
}

/** A LogicalType corresponding to CHAR. */
public static class CharType extends PassThroughLogicalType<String> {
public static final String IDENTIFIER = "SqlCharType";

public CharType() {
super(IDENTIFIER, FieldType.STRING, "", FieldType.STRING);
}
}

/** Returns true if the type is any of the various date time types. */
public static boolean isDateTimeType(FieldType fieldType) {
if (fieldType.getTypeName() == TypeName.DATETIME) {
Expand All @@ -90,10 +81,9 @@ public static boolean isStringType(FieldType fieldType) {
}

if (fieldType.getTypeName().isLogicalType()) {
Schema.LogicalType logicalType = fieldType.getLogicalType();
Preconditions.checkArgumentNotNull(logicalType);
String logicalId = logicalType.getIdentifier();
return logicalId.equals(CharType.IDENTIFIER);
Schema.LogicalType<?, ?> logicalType = fieldType.getLogicalType();
return logicalType instanceof PassThroughLogicalType
&& logicalType.getBaseType().getTypeName() == TypeName.STRING;
}
return false;
}
Expand All @@ -107,9 +97,10 @@ public static boolean isStringType(FieldType fieldType) {
public static final FieldType DOUBLE = FieldType.DOUBLE;
public static final FieldType DECIMAL = FieldType.DECIMAL;
public static final FieldType BOOLEAN = FieldType.BOOLEAN;
// TODO(https://github.com/apache/beam/issues/24019) Support sql types with arguments
public static final FieldType VARBINARY = FieldType.BYTES;
public static final FieldType VARCHAR = FieldType.STRING;
public static final FieldType CHAR = FieldType.logicalType(new CharType());
public static final FieldType CHAR = FieldType.STRING;
Copy link
Contributor Author

@Abacn Abacn Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid breaking change this public field is preserved but is now the same as VARCHAR (originally Calcite.CharType was also essentially the same as VARCHAR. It did not do padding as FixedString.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. CHAR is not the same as string, it should have fixed length. What breaks if you make this fixed length?

Copy link
Contributor Author

@Abacn Abacn Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. The "breaking change" I mentioned here is the removal of a public field.

We have the same situation for fixed binary:


and

makes BINARY the same as BYTES in CalciteUtils currently.

A full support for BINARY/VARBINARY/CHAR/VARCHAR with specified limit involves taking Calcite logical type's precision parameter (currently it is omitted) and pass it to FixedBinary/VariableBinary/FixedString/VariableString constructors. We cannot have constant logical type instance for these types because they require an argument in constructor. The full support for these types is left as future work. For now this change just unblocks the use case that mentioned in https://issues.apache.org/jira/browse/BEAM-12323.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entered #24019 and added TODO here. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @apilloud, do you think it suffices to map CHAR to String as we did for mapping BINARY to bytes, or we should also implement #24019 in this PR. Is there any breaking change if treat CHAR as String in Beam?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this path is only used by string literals used in a SQL statement today. Assuming that is true there is no harm in moving forward with this as is. I wanted to know what breaks here as that would potentially identify other use cases.

public static final FieldType DATE = FieldType.logicalType(SqlTypes.DATE);
public static final FieldType NULLABLE_DATE =
FieldType.logicalType(SqlTypes.DATE).withNullable(true);
Expand All @@ -136,7 +127,6 @@ public static boolean isStringType(FieldType fieldType) {
.put(BOOLEAN, SqlTypeName.BOOLEAN)
.put(VARBINARY, SqlTypeName.VARBINARY)
.put(VARCHAR, SqlTypeName.VARCHAR)
.put(CHAR, SqlTypeName.CHAR)
.put(DATE, SqlTypeName.DATE)
.put(TIME, SqlTypeName.TIME)
.put(TIME_WITH_LOCAL_TZ, SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE)
Expand All @@ -154,6 +144,9 @@ public static boolean isStringType(FieldType fieldType) {
.put(SqlTypeName.DOUBLE, DOUBLE)
.put(SqlTypeName.DECIMAL, DECIMAL)
.put(SqlTypeName.BOOLEAN, BOOLEAN)
// TODO(https://github.com/apache/beam/issues/24019) Support sql types with arguments
// Handle Calcite VARBINARY/BINARY/VARCHAR/CHAR with
// VariableBinary/FixedBinary/VariableString/FixedString logical types.
.put(SqlTypeName.VARBINARY, VARBINARY)
.put(SqlTypeName.BINARY, VARBINARY)
.put(SqlTypeName.VARCHAR, VARCHAR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
Expand Down Expand Up @@ -262,7 +264,6 @@ public abstract static class Builder {
.put(SqlTypes.TIME.getIdentifier(), StandardSQLTypeName.TIME)
.put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
.put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
.put("SqlCharType", StandardSQLTypeName.STRING)
.put("Enum", StandardSQLTypeName.STRING)
.build();

Expand All @@ -280,6 +281,9 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
Preconditions.checkArgumentNotNull(fieldType.getLogicalType());
ret = BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(logicalType.getIdentifier());
if (ret == null) {
if (logicalType instanceof PassThroughLogicalType) {
return toStandardSQLTypeName(logicalType.getBaseType());
}
throw new IllegalArgumentException(
"Cannot convert Beam logical type: "
+ logicalType.getIdentifier()
Expand Down Expand Up @@ -718,7 +722,6 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso

// TODO: BigQuery shouldn't know about SQL internal logical types.
private static final Set<String> SQL_DATE_TIME_TYPES = ImmutableSet.of("SqlTimeWithLocalTzType");
private static final Set<String> SQL_STRING_TYPES = ImmutableSet.of("SqlCharType");

/**
* Tries to convert an Avro decoded value to a Beam field value based on the target type of the
Expand Down Expand Up @@ -766,7 +769,9 @@ public static Object convertAvroFormat(
case ARRAY:
return convertAvroArray(beamFieldType, avroValue, options);
case LOGICAL_TYPE:
String identifier = beamFieldType.getLogicalType().getIdentifier();
LogicalType<?, ?> logicalType = beamFieldType.getLogicalType();
assert logicalType != null;
String identifier = logicalType.getIdentifier();
if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
return convertAvroDate(avroValue);
} else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
Expand All @@ -784,8 +789,8 @@ public static Object convertAvroFormat(
String.format(
"Unknown timestamp truncation option: %s", options.getTruncateTimestamps()));
}
} else if (SQL_STRING_TYPES.contains(identifier)) {
return convertAvroPrimitiveTypes(TypeName.STRING, avroValue);
} else if (logicalType instanceof PassThroughLogicalType) {
return convertAvroFormat(logicalType.getBaseType(), avroValue, options);
} else {
throw new RuntimeException("Unknown logical type " + identifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.protobuf.ByteString;
import java.io.Serializable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Shorts;
Expand Down Expand Up @@ -95,11 +97,12 @@ ByteString valueToByteString(Object value, Schema.FieldType type) {
case DATETIME:
return byteString(value.toString().getBytes(UTF_8));
case LOGICAL_TYPE:
String identifier = checkArgumentNotNull(type.getLogicalType()).getIdentifier();
if ("SqlCharType".equals(identifier)) {
return byteString(((String) value).getBytes(UTF_8));
LogicalType<?, ?> logicalType = checkArgumentNotNull(type.getLogicalType());
if (logicalType instanceof PassThroughLogicalType) {
return valueToByteString(value, logicalType.getBaseType());
} else {
throw new IllegalStateException("Unsupported logical type: " + identifier);
throw new IllegalStateException(
"Unsupported logical type: " + logicalType.getIdentifier());
}
default:
throw new IllegalStateException("Unsupported type: " + type.getTypeName());
Expand Down