-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-8307] NPE in Calcite dialect when input PCollection has logical… #11581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.schemas.logicaltypes; | ||
|
|
||
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; | ||
|
|
||
| import org.apache.beam.sdk.annotations.Experimental; | ||
| import org.apache.beam.sdk.schemas.Schema; | ||
|
|
||
| /** A LogicalType representing a fixed-length string. */ | ||
| @Experimental(Experimental.Kind.SCHEMAS) | ||
| public class FixedLengthString extends IdenticalBaseTAndInputTLogicalType<String> { | ||
| public static final String IDENTIFIER = "beam:logical_type:fixed_length_string:v1"; | ||
| private final int length; | ||
|
|
||
| private FixedLengthString(int length) { | ||
| super(IDENTIFIER, Schema.FieldType.INT32, length, Schema.FieldType.STRING); | ||
| this.length = length; | ||
| } | ||
|
|
||
| public int getLength() { | ||
| return length; | ||
| } | ||
|
|
||
| public static FixedLengthString of(int length) { | ||
| return new FixedLengthString(length); | ||
| } | ||
|
|
||
| @Override | ||
| public String toBaseType(String input) { | ||
| checkArgument(input == null || input.length() == length); | ||
| return input; | ||
| } | ||
|
|
||
| @Override | ||
| public String toInputType(String base) { | ||
| checkArgument(base == null || base.length() == length); | ||
| return base; | ||
| } | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add constants for these new logical types in SqlTypes? cc: @robinyqiu |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.schemas.logicaltypes; | ||
|
|
||
| import java.util.Objects; | ||
| import org.apache.beam.sdk.annotations.Experimental; | ||
| import org.apache.beam.sdk.schemas.Schema; | ||
|
|
||
| /** A base class for LogicalTypes that use the same input type as the underlying base type. */ | ||
| @Experimental(Experimental.Kind.SCHEMAS) | ||
| public abstract class IdenticalBaseTAndInputTLogicalType<T> implements Schema.LogicalType<T, T> { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you make this package-private?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you make this package-private? I think its only used in |
||
| protected final String identifier; | ||
| protected final Schema.FieldType argumentType; | ||
| protected final Object argument; | ||
| protected final Schema.FieldType baseType; | ||
|
|
||
| protected IdenticalBaseTAndInputTLogicalType( | ||
| String identifier, | ||
| Schema.FieldType argumentType, | ||
| Object argument, | ||
| Schema.FieldType baseType) { | ||
| this.identifier = identifier; | ||
| this.argumentType = argumentType; | ||
| this.argument = argument; | ||
| this.baseType = baseType; | ||
| } | ||
|
|
||
| @Override | ||
| public String getIdentifier() { | ||
| return identifier; | ||
| } | ||
|
|
||
| @Override | ||
| public Schema.FieldType getArgumentType() { | ||
| return argumentType; | ||
| } | ||
|
|
||
| @Override | ||
| @SuppressWarnings("TypeParameterUnusedInFormals") | ||
| public <ArgumentT> ArgumentT getArgument() { | ||
| return (ArgumentT) argument; | ||
| } | ||
|
|
||
| @Override | ||
| public Schema.FieldType getBaseType() { | ||
| return baseType; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (o == null || getClass() != o.getClass()) { | ||
| return false; | ||
| } | ||
| IdenticalBaseTAndInputTLogicalType<?> that = (IdenticalBaseTAndInputTLogicalType<?>) o; | ||
| return identifier.equals(that.identifier) | ||
| && argument.equals(that.argument) | ||
| && baseType.equals(that.baseType); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(identifier, argument, baseType); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.schemas.logicaltypes; | ||
|
|
||
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; | ||
|
|
||
| import java.math.BigDecimal; | ||
| import org.apache.beam.sdk.annotations.Experimental; | ||
| import org.apache.beam.sdk.schemas.Schema; | ||
| import org.apache.beam.sdk.values.Row; | ||
|
|
||
| /** A LogicalType representing a Decimal type with custom precision and scale. */ | ||
| @Experimental(Experimental.Kind.SCHEMAS) | ||
| public class LogicalDecimal extends IdenticalBaseTAndInputTLogicalType<BigDecimal> { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just call this |
||
| public static final String IDENTIFIER = "beam:logical_type:decimal:v1"; | ||
| private static final Schema schema = | ||
| Schema.builder().addInt32Field("precision").addInt32Field("scale").build(); | ||
| private final int precision; | ||
| private final int scale; | ||
|
|
||
| private LogicalDecimal(int precision, int scale) { | ||
| super( | ||
| IDENTIFIER, | ||
| Schema.FieldType.row(schema), | ||
| Row.withSchema(schema).addValues(precision, scale).build(), | ||
| Schema.FieldType.DECIMAL); | ||
| this.precision = precision; | ||
| this.scale = scale; | ||
| } | ||
|
|
||
| public static LogicalDecimal of(int precision, int scale) { | ||
| return new LogicalDecimal(precision, scale); | ||
| } | ||
|
|
||
| public int getPrecision() { | ||
| return precision; | ||
| } | ||
|
|
||
| public int getScale() { | ||
| return scale; | ||
| } | ||
|
|
||
| @Override | ||
| public BigDecimal toBaseType(BigDecimal input) { | ||
| checkArgument(input == null || (input.precision() == precision && input.scale() == scale)); | ||
| return input; | ||
| } | ||
|
|
||
| @Override | ||
| public BigDecimal toInputType(BigDecimal base) { | ||
| checkArgument(base == null || (base.precision() == precision && base.scale() == scale)); | ||
| return base; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.schemas.logicaltypes; | ||
|
|
||
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; | ||
|
|
||
| import org.apache.beam.sdk.annotations.Experimental; | ||
| import org.apache.beam.sdk.schemas.Schema; | ||
|
|
||
| /** A LogicalType representing a variable-size byte array. */ | ||
| @Experimental(Experimental.Kind.SCHEMAS) | ||
| public class VariableLengthBytes extends IdenticalBaseTAndInputTLogicalType<byte[]> { | ||
| public static final String IDENTIFIER = "beam:logical_type:variable_length_bytes:v1"; | ||
| private final int byteArraySize; | ||
|
|
||
| private VariableLengthBytes(int byteArraySize) { | ||
| super(IDENTIFIER, Schema.FieldType.INT32, byteArraySize, Schema.FieldType.BYTES); | ||
| this.byteArraySize = byteArraySize; | ||
| } | ||
|
|
||
| public static VariableLengthBytes of(int byteArraySize) { | ||
| return new VariableLengthBytes(byteArraySize); | ||
| } | ||
|
|
||
| public int getLength() { | ||
| return byteArraySize; | ||
| } | ||
|
|
||
| @Override | ||
| public byte[] toBaseType(byte[] input) { | ||
| checkArgument(input == null || input.length <= byteArraySize); | ||
| return input; | ||
| } | ||
|
|
||
| @Override | ||
| public byte[] toInputType(byte[] base) { | ||
| checkArgument(base == null || base.length <= byteArraySize); | ||
| return base; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @reuvenlax this PR is removing the ability to convert smaller byte arrays. As noted in #11609 (comment) it seems this logic is inaccessible anyway.