Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.logicaltypes;

import java.time.LocalDate;
import org.apache.beam.sdk.schemas.Schema;

/**
* A date without a time-zone.
*
* <p>It cannot represent an instant on the time-line without additional information such as an
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worth documenting that the Long is an offset from an epoch (and what that epoch is).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* offset or time-zone.
*
* <p>Its input type is a {@link LocalDate}, and base type is a {@link Long} that represents a
* incrementing count of days where day 0 is 1970-01-01 (ISO).
*/
public class Date implements Schema.LogicalType<LocalDate, Long> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading the correctly, LocalDate is the in memory type (a struct) and Long is the wire format (an offset from epoch)? This conversion could be quite expensive. It appears the Calc nodes both take an offset in this case, when we start to think about performance we might need to change the in memory type to be offset based.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(If changing the in memory type is going to be difficult in the future, consider doing that now.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is unfortunate... but what in-memory type should we use instead? joda.time.LocalDate uses a millisecond long, do we want to add another joda dependency?

We could access the base type (wire format type) directly in SQL with Row#getBaseValue, but unfortunately Rows store logical types as the input type (in memory format type), so that wouldn't actually avoid a conversion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess java.sql.Date is another option for a java type backed by millis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider not using a JVM, it adds performance overhead too. 🤓

I'm reasonably convinced the wire format is good and the conversion here is lossless, so if there isn't a easy drop-in replacement leave this as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Andrew is basically suggesting using a PassThroughLogicalType<Long> as a logical type for DATE. I think we could definitely consider this if performance becomes a problem in the future. (It's not easy to change the in-memory type for Date after it is made public, but we can easily define a new SqlDate.) For now I think we can leave it as is. It's more human readable (e.g. writing tests for DATE type in spec tests is simpler).


@Override
public String getIdentifier() {
return "beam:logical_type:date:v1";
}

// unused
@Override
public Schema.FieldType getArgumentType() {
return Schema.FieldType.STRING;
}

// unused
@Override
public String getArgument() {
return "";
}

@Override
public Schema.FieldType getBaseType() {
return Schema.FieldType.INT64;
}

@Override
public Long toBaseType(LocalDate input) {
return input == null ? null : input.toEpochDay();
}

@Override
public LocalDate toInputType(Long base) {
return base == null ? null : LocalDate.ofEpochDay(base);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.logicaltypes;

import java.time.LocalDate;
import org.apache.beam.sdk.schemas.Schema.LogicalType;

/** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType}s corresponding to SQL data types. */
public class SqlTypes {

private SqlTypes() {}

/** Beam LogicalType corresponding to ZetaSQL/CalciteSQL DATE type. */
public static final LogicalType<LocalDate, Long> DATE = new Date();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Strings;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;

Expand All @@ -38,7 +39,7 @@ class SchemaUtils {
ImmutableMap.<String, FieldType>builder()
.put("BOOL", FieldType.BOOLEAN)
.put("BYTES", FieldType.BYTES)
.put("DATE", FieldType.logicalType(new CalciteUtils.DateType()))
.put("DATE", FieldType.logicalType(SqlTypes.DATE))
.put("DATETIME", FieldType.DATETIME)
.put("DOUBLE", FieldType.DOUBLE)
.put("FLOAT", FieldType.DOUBLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import static org.apache.beam.sdk.schemas.Schema.FieldType;
import static org.apache.beam.sdk.schemas.Schema.TypeName;
import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.AbstractList;
import java.util.AbstractMap;
import java.util.Arrays;
Expand All @@ -39,11 +39,11 @@
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimestampWithLocalTzType;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -315,7 +315,7 @@ private static Expression castOutput(Expression value, FieldType toType) {
private static Expression castOutputTime(Expression value, FieldType toType) {
Expression valueDateTime = value;

// First, convert to millis
// First, convert to millis (except for DATE type)
if (CalciteUtils.TIMESTAMP.typesEqual(toType)
|| CalciteUtils.NULLABLE_TIMESTAMP.typesEqual(toType)) {
if (value.getType() == java.sql.Timestamp.class) {
Expand All @@ -331,13 +331,16 @@ private static Expression castOutputTime(Expression value, FieldType toType) {
if (value.getType() == java.sql.Date.class) {
valueDateTime = Expressions.call(BuiltInMethod.DATE_TO_INT.method, valueDateTime);
}
valueDateTime = Expressions.multiply(valueDateTime, Expressions.constant(MILLIS_PER_DAY));
} else {
throw new UnsupportedOperationException("Unknown DateTime type " + toType);
}

// Second, convert to joda Instant
valueDateTime = Expressions.new_(Instant.class, valueDateTime);
// Second, convert to joda Instant (or LocalDate for DATE type)
if (CalciteUtils.DATE.typesEqual(toType) || CalciteUtils.NULLABLE_DATE.typesEqual(toType)) {
valueDateTime = Expressions.call(LocalDate.class, "ofEpochDay", valueDateTime);
} else {
valueDateTime = Expressions.new_(Instant.class, valueDateTime);
}

// Third, make conversion conditional on non-null input.
if (!((Class) value.getType()).isPrimitive()) {
Expand Down Expand Up @@ -371,9 +374,9 @@ private static class InputGetterImpl implements RexToLixTranslator.InputGetter {
.put(TypeName.ROW, Row.class)
.build();

private static final Map<String, Class> LOGICAL_TYPE_CONVERSION_MAP =
private static final Map<String, Class> LOGICAL_TYPE_TO_BASE_TYPE_MAP =
ImmutableMap.<String, Class>builder()
.put(DateType.IDENTIFIER, ReadableInstant.class)
.put(SqlTypes.DATE.getIdentifier(), Long.class)
.put(TimeType.IDENTIFIER, ReadableInstant.class)
.put(TimeWithLocalTzType.IDENTIFIER, ReadableInstant.class)
.put(TimestampWithLocalTzType.IDENTIFIER, ReadableInstant.class)
Expand Down Expand Up @@ -406,7 +409,7 @@ private static Expression value(
if (storageType == Object.class) {
convertTo = Object.class;
} else if (fromType.getTypeName().isLogicalType()) {
convertTo = LOGICAL_TYPE_CONVERSION_MAP.get(fromType.getLogicalType().getIdentifier());
convertTo = LOGICAL_TYPE_TO_BASE_TYPE_MAP.get(fromType.getLogicalType().getIdentifier());
} else {
convertTo = TYPE_CONVERSION_MAP.get(fromType.getTypeName());
}
Expand All @@ -427,18 +430,13 @@ private static Expression value(

private static Expression value(Expression value, Schema.FieldType type) {
if (type.getTypeName().isLogicalType()) {
Expression millisField = Expressions.call(value, "getMillis");
String logicalId = type.getLogicalType().getIdentifier();
if (logicalId.equals(TimeType.IDENTIFIER)) {
return nullOr(value, Expressions.convert_(millisField, int.class));
} else if (logicalId.equals(DateType.IDENTIFIER)) {
value =
nullOr(
value,
Expressions.convert_(
Expressions.divide(millisField, Expressions.constant(MILLIS_PER_DAY)),
int.class));
} else if (!logicalId.equals(CharType.IDENTIFIER)) {
if (TimeType.IDENTIFIER.equals(logicalId)) {
return nullOr(
value, Expressions.convert_(Expressions.call(value, "getMillis"), int.class));
} else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) {
value = nullOr(value, value);
} else if (!CharType.IDENTIFIER.equals(logicalId)) {
throw new UnsupportedOperationException(
"Unknown LogicalType " + type.getLogicalType().getIdentifier());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;

import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY;

import java.io.IOException;
import java.time.LocalDate;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -36,7 +36,6 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.Counter;
Expand All @@ -50,6 +49,7 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -303,11 +303,15 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {
switch (type.getTypeName()) {
case LOGICAL_TYPE:
String logicalId = type.getLogicalType().getIdentifier();
if (logicalId.equals(TimeType.IDENTIFIER)) {
if (TimeType.IDENTIFIER.equals(logicalId)) {
return (int) ((ReadableInstant) beamValue).getMillis();
} else if (logicalId.equals(DateType.IDENTIFIER)) {
return (int) (((ReadableInstant) beamValue).getMillis() / MILLIS_PER_DAY);
} else if (logicalId.equals(CharType.IDENTIFIER)) {
} else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) {
if (beamValue instanceof Long) { // base type
return ((Long) beamValue).intValue();
} else { // input type
return (int) (((LocalDate) beamValue).toEpochDay());
}
} else if (CharType.IDENTIFIER.equals(logicalId)) {
return beamValue;
} else {
throw new UnsupportedOperationException("Unknown DateTime type " + logicalId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.io.IOException;
import java.io.StringWriter;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
Expand Down Expand Up @@ -114,8 +117,17 @@ public static Object autoCastField(Schema.Field field, Object rawObj) {
} else {
return rawObj;
}
} else if (CalciteUtils.DATE.typesEqual(type) || CalciteUtils.NULLABLE_DATE.typesEqual(type)) {
if (rawObj instanceof GregorianCalendar) { // used by the SQL CLI
GregorianCalendar calendar = (GregorianCalendar) rawObj;
return Instant.ofEpochMilli(calendar.getTimeInMillis())
.atZone(calendar.getTimeZone().toZoneId())
.toLocalDate();
} else {
return LocalDate.ofEpochDay((Integer) rawObj);
}
} else if (CalciteUtils.isDateTimeType(type)) {
// Internal representation of DateType in Calcite is convertible to Joda's Datetime.
// Internal representation of Date in Calcite is convertible to Joda's Datetime.
return new DateTime(rawObj);
} else if (type.getTypeName().isNumericType()
&& ((rawObj instanceof String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.BiMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
Expand All @@ -43,15 +44,6 @@ public class CalciteUtils {
// SQL has schema types that do not directly correspond to Beam Schema types. We define
// LogicalTypes to represent each of these types.

/** A LogicalType corresponding to DATE. */
public static class DateType extends PassThroughLogicalType<Instant> {
public static final String IDENTIFIER = "SqlDateType";

public DateType() {
super(IDENTIFIER, FieldType.STRING, "", FieldType.DATETIME);
}
}

/** A LogicalType corresponding to TIME. */
public static class TimeType extends PassThroughLogicalType<Instant> {
public static final String IDENTIFIER = "SqlTimeType";
Expand Down Expand Up @@ -96,7 +88,7 @@ public static boolean isDateTimeType(FieldType fieldType) {

if (fieldType.getTypeName().isLogicalType()) {
String logicalId = fieldType.getLogicalType().getIdentifier();
return logicalId.equals(DateType.IDENTIFIER)
return logicalId.equals(SqlTypes.DATE.getIdentifier())
|| logicalId.equals(TimeType.IDENTIFIER)
|| logicalId.equals(TimeWithLocalTzType.IDENTIFIER)
|| logicalId.equals(TimestampWithLocalTzType.IDENTIFIER);
Expand Down Expand Up @@ -128,9 +120,9 @@ public static boolean isStringType(FieldType fieldType) {
public static final FieldType VARBINARY = FieldType.BYTES;
public static final FieldType VARCHAR = FieldType.STRING;
public static final FieldType CHAR = FieldType.logicalType(new CharType());
public static final FieldType DATE = FieldType.logicalType(new DateType());
public static final FieldType DATE = FieldType.logicalType(SqlTypes.DATE);
public static final FieldType NULLABLE_DATE =
FieldType.logicalType(new DateType()).withNullable(true);
FieldType.logicalType(SqlTypes.DATE).withNullable(true);
public static final FieldType TIME = FieldType.logicalType(new TimeType());
public static final FieldType NULLABLE_TIME =
FieldType.logicalType(new TimeType()).withNullable(true);
Expand Down Expand Up @@ -205,12 +197,16 @@ public static SqlTypeName toSqlTypeName(FieldType type) {
return SqlTypeName.MAP;
default:
SqlTypeName typeName = BEAM_TO_CALCITE_TYPE_MAPPING.get(type.withNullable(false));
if (typeName != null) {
return typeName;
} else {
if (typeName == null) {
// This will happen e.g. if looking up a STRING type, and metadata isn't set to say which
// type of SQL string we want. In this case, use the default mapping.
return BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type);
typeName = BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type);
}
if (typeName == null) {
throw new IllegalArgumentException(
String.format("Cannot find a matching Calcite SqlTypeName for Beam type: %s", type));
} else {
return typeName;
}
}
}
Expand Down
Loading