From f6295846c9a55c557aac5ce8c549d4224e05fcdf Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 17 Sep 2019 17:25:54 -0700 Subject: [PATCH 1/4] Revert "Revert "Update portable schema representation and java SchemaTranslation (#8853)"" This reverts commit dbcb14c57f03b947fc43f244a62566a09ba3c39c. --- .../src/main/proto/beam_runner_api.proto | 46 ++++---- .../core/construction/SchemaTranslation.java | 108 +++++++++++------- .../construction/SchemaTranslationTest.java | 89 +++++++++++++++ 3 files changed, 178 insertions(+), 65 deletions(-) create mode 100644 runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index aa3184d76f67..58f56f706eb5 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -647,29 +647,21 @@ message StandardCoders { // Experimental: A representation of a Beam Schema. message Schema { - enum TypeName { - BYTE = 0; - INT16 = 1; - INT32 = 2; - INT64 = 3; - DECIMAL = 4; + enum AtomicType { + UNSPECIFIED = 0; + BYTE = 1; + INT16 = 2; + INT32 = 3; + INT64 = 4; FLOAT = 5; DOUBLE = 6; STRING = 7; - DATETIME = 8; - BOOLEAN = 9; - BYTES = 10; - ARRAY = 11; - MAP = 13; - ROW = 14; - LOGICAL_TYPE = 15; + BOOLEAN = 8; + BYTES = 9; } - message LogicalType { - string id = 1; - string args = 2; - FieldType base_type = 3; - bytes serialized_class = 4; + message ArrayType { + FieldType element_type = 1; } message MapType { @@ -677,13 +669,23 @@ message Schema { FieldType value_type = 2; } + message RowType { + Schema schema = 1; + } + + message LogicalType { + string urn = 1; + string args = 2; + FieldType representation = 3; + } + message FieldType { - TypeName type_name = 1; - bool nullable = 2; + bool nullable = 1; oneof type_info { - FieldType collection_element_type = 3; + AtomicType atomic_type = 2; + ArrayType array_type = 3; MapType map_type = 4; - Schema row_schema = 5; + RowType row_type = 5; LogicalType logical_type = 6; } } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java index 26b154d12ed2..dabe4e13588b 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java @@ -25,33 +25,29 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableBiMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; /** Utility methods for translating schemas. */ public class SchemaTranslation { - private static final BiMap TYPE_NAME_MAPPING = - ImmutableBiMap.builder() - .put(TypeName.BYTE, RunnerApi.Schema.TypeName.BYTE) - .put(TypeName.INT16, RunnerApi.Schema.TypeName.INT16) - .put(TypeName.INT32, RunnerApi.Schema.TypeName.INT32) - .put(TypeName.INT64, RunnerApi.Schema.TypeName.INT64) - .put(TypeName.DECIMAL, RunnerApi.Schema.TypeName.DECIMAL) - .put(TypeName.FLOAT, RunnerApi.Schema.TypeName.FLOAT) - .put(TypeName.DOUBLE, RunnerApi.Schema.TypeName.DOUBLE) - .put(TypeName.STRING, RunnerApi.Schema.TypeName.STRING) - .put(TypeName.DATETIME, RunnerApi.Schema.TypeName.DATETIME) - .put(TypeName.BOOLEAN, RunnerApi.Schema.TypeName.BOOLEAN) - .put(TypeName.BYTES, RunnerApi.Schema.TypeName.BYTES) - .put(TypeName.ARRAY, RunnerApi.Schema.TypeName.ARRAY) - .put(TypeName.MAP, RunnerApi.Schema.TypeName.MAP) - .put(TypeName.ROW, RunnerApi.Schema.TypeName.ROW) - .put(TypeName.LOGICAL_TYPE, RunnerApi.Schema.TypeName.LOGICAL_TYPE) + + private static final BiMap ATOMIC_TYPE_MAPPING = + ImmutableBiMap.builder() + .put(TypeName.BYTE, RunnerApi.Schema.AtomicType.BYTE) + .put(TypeName.INT16, RunnerApi.Schema.AtomicType.INT16) + .put(TypeName.INT32, RunnerApi.Schema.AtomicType.INT32) + .put(TypeName.INT64, RunnerApi.Schema.AtomicType.INT64) + .put(TypeName.FLOAT, RunnerApi.Schema.AtomicType.FLOAT) + .put(TypeName.DOUBLE, RunnerApi.Schema.AtomicType.DOUBLE) + .put(TypeName.STRING, RunnerApi.Schema.AtomicType.STRING) + .put(TypeName.BOOLEAN, RunnerApi.Schema.AtomicType.BOOLEAN) + .put(TypeName.BYTES, RunnerApi.Schema.AtomicType.BYTES) .build(); + private static final String URN_BEAM_LOGICAL_DATETIME = "beam:fieldtype:datetime"; + private static final String URN_BEAM_LOGICAL_DECIMAL = "beam:fieldtype:decimal"; + public static RunnerApi.Schema toProto(Schema schema) { String uuid = schema.getUUID() != null ? schema.getUUID().toString() : ""; RunnerApi.Schema.Builder builder = RunnerApi.Schema.newBuilder().setId(uuid); @@ -77,16 +73,17 @@ private static RunnerApi.Schema.Field toProto(Field field, int fieldId, int posi } private static RunnerApi.Schema.FieldType toProto(FieldType fieldType) { - RunnerApi.Schema.FieldType.Builder builder = - RunnerApi.Schema.FieldType.newBuilder() - .setTypeName(TYPE_NAME_MAPPING.get(fieldType.getTypeName())); + RunnerApi.Schema.FieldType.Builder builder = RunnerApi.Schema.FieldType.newBuilder(); switch (fieldType.getTypeName()) { case ROW: - builder.setRowSchema(toProto(fieldType.getRowSchema())); + builder.setRowType( + RunnerApi.Schema.RowType.newBuilder().setSchema(toProto(fieldType.getRowSchema()))); break; case ARRAY: - builder.setCollectionElementType(toProto(fieldType.getCollectionElementType())); + builder.setArrayType( + RunnerApi.Schema.ArrayType.newBuilder() + .setElementType(toProto(fieldType.getCollectionElementType()))); break; case MAP: @@ -101,15 +98,29 @@ private static RunnerApi.Schema.FieldType toProto(FieldType fieldType) { LogicalType logicalType = fieldType.getLogicalType(); builder.setLogicalType( RunnerApi.Schema.LogicalType.newBuilder() - .setId(logicalType.getIdentifier()) + .setUrn(logicalType.getIdentifier()) .setArgs(logicalType.getArgument()) - .setBaseType(toProto(logicalType.getBaseType())) - .setSerializedClass( - ByteString.copyFrom(SerializableUtils.serializeToByteArray(logicalType))) + .setRepresentation(toProto(logicalType.getBaseType())) + .build()); + break; + // Special-case for DATETIME and DECIMAL which are logical types in portable representation, + // but not yet in Java. (BEAM-7554) + case DATETIME: + builder.setLogicalType( + RunnerApi.Schema.LogicalType.newBuilder() + .setUrn(URN_BEAM_LOGICAL_DATETIME) + .setRepresentation(toProto(FieldType.INT64)) + .build()); + break; + case DECIMAL: + builder.setLogicalType( + RunnerApi.Schema.LogicalType.newBuilder() + .setUrn(URN_BEAM_LOGICAL_DECIMAL) + .setRepresentation(toProto(FieldType.BYTES)) .build()); break; - default: + builder.setAtomicType(ATOMIC_TYPE_MAPPING.get(fieldType.getTypeName())); break; } builder.setNullable(fieldType.getNullable()); @@ -139,32 +150,43 @@ private static Field fieldFromProto(RunnerApi.Schema.Field protoField) { } private static FieldType fieldTypeFromProto(RunnerApi.Schema.FieldType protoFieldType) { - TypeName typeName = TYPE_NAME_MAPPING.inverse().get(protoFieldType.getTypeName()); FieldType fieldType; - switch (typeName) { - case ROW: - fieldType = FieldType.row(fromProto(protoFieldType.getRowSchema())); + switch (protoFieldType.getTypeInfoCase()) { + case ATOMIC_TYPE: + TypeName typeName = ATOMIC_TYPE_MAPPING.inverse().get(protoFieldType.getAtomicType()); + fieldType = FieldType.of(typeName); break; - case ARRAY: - fieldType = FieldType.array(fieldTypeFromProto(protoFieldType.getCollectionElementType())); + case ROW_TYPE: + fieldType = FieldType.row(fromProto(protoFieldType.getRowType().getSchema())); break; - case MAP: + case ARRAY_TYPE: + fieldType = + FieldType.array(fieldTypeFromProto(protoFieldType.getArrayType().getElementType())); + break; + case MAP_TYPE: fieldType = FieldType.map( fieldTypeFromProto(protoFieldType.getMapType().getKeyType()), fieldTypeFromProto(protoFieldType.getMapType().getValueType())); break; case LOGICAL_TYPE: - LogicalType logicalType = - (LogicalType) - SerializableUtils.deserializeFromByteArray( - protoFieldType.getLogicalType().getSerializedClass().toByteArray(), - "logicalType"); - fieldType = FieldType.logicalType(logicalType); + // Special-case for DATETIME and DECIMAL which are logical types in portable representation, + // but not yet in Java. (BEAM-7554) + String urn = protoFieldType.getLogicalType().getUrn(); + if (urn.equals(URN_BEAM_LOGICAL_DATETIME)) { + fieldType = FieldType.DATETIME; + } else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) { + fieldType = FieldType.DECIMAL; + } else { + // TODO: Look up logical type class by URN. + throw new IllegalArgumentException("Decoding logical types is not yet supported."); + } break; default: - fieldType = FieldType.of(typeName); + throw new IllegalArgumentException( + "Unexpected type_info: " + protoFieldType.getTypeInfoCase()); } + if (protoFieldType.getNullable()) { fieldType = fieldType.withNullable(true); } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java new file mode 100644 index 000000000000..5293cfe9f783 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for {@link SchemaTranslation}. */ +public class SchemaTranslationTest { + + /** Tests round-trip proto encodings for {@link Schema}. */ + @RunWith(Parameterized.class) + public static class ToFromProtoTest { + @Parameters(name = "{index}: {0}") + public static Iterable data() { + return ImmutableList.builder() + .add(Schema.of(Field.of("string", FieldType.STRING))) + .add( + Schema.of( + Field.of("boolean", FieldType.BOOLEAN), + Field.of("byte", FieldType.BYTE), + Field.of("int16", FieldType.INT16), + Field.of("int32", FieldType.INT32), + Field.of("int64", FieldType.INT64))) + .add( + Schema.of( + Field.of( + "row", + FieldType.row( + Schema.of( + Field.of("foo", FieldType.STRING), + Field.of("bar", FieldType.DOUBLE), + Field.of("baz", FieldType.BOOLEAN)))))) + .add( + Schema.of( + Field.of( + "array(array(int64)))", + FieldType.array(FieldType.array(FieldType.INT64.withNullable(true)))))) + .add( + Schema.of( + Field.of("nullable", FieldType.STRING.withNullable(true)), + Field.of("non_nullable", FieldType.STRING.withNullable(false)))) + .add( + Schema.of( + Field.of("decimal", FieldType.DECIMAL), Field.of("datetime", FieldType.DATETIME))) + // Test for when Logical types are supported + // .add(Schema.of(Field.of("logical", + // FieldType.logicalType(LogicalTypes.FixedBytes.of(24))))) + .build(); + } + + @Parameter(0) + public Schema schema; + + @Test + public void toAndFromProto() throws Exception { + RunnerApi.Schema schemaProto = SchemaTranslation.toProto(schema); + + Schema decodedSchema = SchemaTranslation.fromProto(schemaProto); + assertThat(decodedSchema, equalTo(schema)); + } + } +} From 8f498dbe754977cad2a545c5068d14dfebf48fec Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 17 Sep 2019 17:26:15 -0700 Subject: [PATCH 2/4] Revert "Revert "Merge pull request #8943: Schema conversion cleanup"" This reverts commit c9da964d05d3feb7dff2cdafbda6e2800bf25376. --- .../src/main/proto/beam_runner_api.proto | 57 ------- model/pipeline/src/main/proto/schema.proto | 85 +++++++++++ .../core/construction/SchemaTranslation.java | 144 +++++++++++------- .../construction/SchemaTranslationTest.java | 4 +- .../SchemaCoderCloudObjectTranslator.java | 6 +- 5 files changed, 176 insertions(+), 120 deletions(-) create mode 100644 model/pipeline/src/main/proto/schema.proto diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 58f56f706eb5..736bcdc17bad 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -645,63 +645,6 @@ message StandardCoders { } } -// Experimental: A representation of a Beam Schema. -message Schema { - enum AtomicType { - UNSPECIFIED = 0; - BYTE = 1; - INT16 = 2; - INT32 = 3; - INT64 = 4; - FLOAT = 5; - DOUBLE = 6; - STRING = 7; - BOOLEAN = 8; - BYTES = 9; - } - - message ArrayType { - FieldType element_type = 1; - } - - message MapType { - FieldType key_type = 1; - FieldType value_type = 2; - } - - message RowType { - Schema schema = 1; - } - - message LogicalType { - string urn = 1; - string args = 2; - FieldType representation = 3; - } - - message FieldType { - bool nullable = 1; - oneof type_info { - AtomicType atomic_type = 2; - ArrayType array_type = 3; - MapType map_type = 4; - RowType row_type = 5; - LogicalType logical_type = 6; - } - } - - message Field { - string name = 1; - string description = 2; - FieldType type = 3; - int32 id = 4; - int32 encoding_position = 5; - } - - repeated Field fields = 1; - string id = 2; -} - // A windowing strategy describes the window function, triggering, allowed // lateness, and accumulation mode for a PCollection. // diff --git a/model/pipeline/src/main/proto/schema.proto b/model/pipeline/src/main/proto/schema.proto new file mode 100644 index 000000000000..42629ebbcdc3 --- /dev/null +++ b/model/pipeline/src/main/proto/schema.proto @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// ** Experimental ** +// Protocol Buffers describing Beam Schemas, a portable representation for +// complex types. + +syntax = "proto3"; + +package org.apache.beam.model.pipeline.v1; + +option go_package = "pipeline_v1"; +option java_package = "org.apache.beam.model.pipeline.v1"; +option java_outer_classname = "SchemaApi"; + +message Schema { + repeated Field fields = 1; + string id = 2; +} + +message Field { + string name = 1; + string description = 2; + FieldType type = 3; + int32 id = 4; + int32 encoding_position = 5; +} + +message FieldType { + bool nullable = 1; + oneof type_info { + AtomicType atomic_type = 2; + ArrayType array_type = 3; + MapType map_type = 4; + RowType row_type = 5; + LogicalType logical_type = 6; + } +} + +enum AtomicType { + UNSPECIFIED = 0; + BYTE = 1; + INT16 = 2; + INT32 = 3; + INT64 = 4; + FLOAT = 5; + DOUBLE = 6; + STRING = 7; + BOOLEAN = 8; + BYTES = 9; +} + +message ArrayType { + FieldType element_type = 1; +} + +message MapType { + FieldType key_type = 1; + FieldType value_type = 2; +} + +message RowType { + Schema schema = 1; +} + +message LogicalType { + string urn = 1; + string args = 2; + FieldType representation = 3; +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java index dabe4e13588b..e07a20a39d53 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java @@ -19,40 +19,24 @@ import java.util.Map; import java.util.UUID; -import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableBiMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; /** Utility methods for translating schemas. */ public class SchemaTranslation { - - private static final BiMap ATOMIC_TYPE_MAPPING = - ImmutableBiMap.builder() - .put(TypeName.BYTE, RunnerApi.Schema.AtomicType.BYTE) - .put(TypeName.INT16, RunnerApi.Schema.AtomicType.INT16) - .put(TypeName.INT32, RunnerApi.Schema.AtomicType.INT32) - .put(TypeName.INT64, RunnerApi.Schema.AtomicType.INT64) - .put(TypeName.FLOAT, RunnerApi.Schema.AtomicType.FLOAT) - .put(TypeName.DOUBLE, RunnerApi.Schema.AtomicType.DOUBLE) - .put(TypeName.STRING, RunnerApi.Schema.AtomicType.STRING) - .put(TypeName.BOOLEAN, RunnerApi.Schema.AtomicType.BOOLEAN) - .put(TypeName.BYTES, RunnerApi.Schema.AtomicType.BYTES) - .build(); - private static final String URN_BEAM_LOGICAL_DATETIME = "beam:fieldtype:datetime"; private static final String URN_BEAM_LOGICAL_DECIMAL = "beam:fieldtype:decimal"; - public static RunnerApi.Schema toProto(Schema schema) { + public static SchemaApi.Schema toProto(Schema schema) { String uuid = schema.getUUID() != null ? schema.getUUID().toString() : ""; - RunnerApi.Schema.Builder builder = RunnerApi.Schema.newBuilder().setId(uuid); + SchemaApi.Schema.Builder builder = SchemaApi.Schema.newBuilder().setId(uuid); for (Field field : schema.getFields()) { - RunnerApi.Schema.Field protoField = + SchemaApi.Field protoField = toProto( field, schema.indexOf(field.getName()), @@ -62,8 +46,8 @@ public static RunnerApi.Schema toProto(Schema schema) { return builder.build(); } - private static RunnerApi.Schema.Field toProto(Field field, int fieldId, int position) { - return RunnerApi.Schema.Field.newBuilder() + private static SchemaApi.Field toProto(Field field, int fieldId, int position) { + return SchemaApi.Field.newBuilder() .setName(field.getName()) .setDescription(field.getDescription()) .setType(toProto(field.getType())) @@ -72,23 +56,23 @@ private static RunnerApi.Schema.Field toProto(Field field, int fieldId, int posi .build(); } - private static RunnerApi.Schema.FieldType toProto(FieldType fieldType) { - RunnerApi.Schema.FieldType.Builder builder = RunnerApi.Schema.FieldType.newBuilder(); + private static SchemaApi.FieldType toProto(FieldType fieldType) { + SchemaApi.FieldType.Builder builder = SchemaApi.FieldType.newBuilder(); switch (fieldType.getTypeName()) { case ROW: builder.setRowType( - RunnerApi.Schema.RowType.newBuilder().setSchema(toProto(fieldType.getRowSchema()))); + SchemaApi.RowType.newBuilder().setSchema(toProto(fieldType.getRowSchema()))); break; case ARRAY: builder.setArrayType( - RunnerApi.Schema.ArrayType.newBuilder() + SchemaApi.ArrayType.newBuilder() .setElementType(toProto(fieldType.getCollectionElementType()))); break; case MAP: builder.setMapType( - RunnerApi.Schema.MapType.newBuilder() + SchemaApi.MapType.newBuilder() .setKeyType(toProto(fieldType.getMapKeyType())) .setValueType(toProto(fieldType.getMapValueType())) .build()); @@ -97,7 +81,7 @@ private static RunnerApi.Schema.FieldType toProto(FieldType fieldType) { case LOGICAL_TYPE: LogicalType logicalType = fieldType.getLogicalType(); builder.setLogicalType( - RunnerApi.Schema.LogicalType.newBuilder() + SchemaApi.LogicalType.newBuilder() .setUrn(logicalType.getIdentifier()) .setArgs(logicalType.getArgument()) .setRepresentation(toProto(logicalType.getBaseType())) @@ -107,30 +91,54 @@ private static RunnerApi.Schema.FieldType toProto(FieldType fieldType) { // but not yet in Java. (BEAM-7554) case DATETIME: builder.setLogicalType( - RunnerApi.Schema.LogicalType.newBuilder() + SchemaApi.LogicalType.newBuilder() .setUrn(URN_BEAM_LOGICAL_DATETIME) .setRepresentation(toProto(FieldType.INT64)) .build()); break; case DECIMAL: builder.setLogicalType( - RunnerApi.Schema.LogicalType.newBuilder() + SchemaApi.LogicalType.newBuilder() .setUrn(URN_BEAM_LOGICAL_DECIMAL) .setRepresentation(toProto(FieldType.BYTES)) .build()); break; - default: - builder.setAtomicType(ATOMIC_TYPE_MAPPING.get(fieldType.getTypeName())); + case BYTE: + builder.setAtomicType(SchemaApi.AtomicType.BYTE); + break; + case INT16: + builder.setAtomicType(SchemaApi.AtomicType.INT16); + break; + case INT32: + builder.setAtomicType(SchemaApi.AtomicType.INT32); + break; + case INT64: + builder.setAtomicType(SchemaApi.AtomicType.INT64); + break; + case FLOAT: + builder.setAtomicType(SchemaApi.AtomicType.FLOAT); + break; + case DOUBLE: + builder.setAtomicType(SchemaApi.AtomicType.DOUBLE); + break; + case STRING: + builder.setAtomicType(SchemaApi.AtomicType.STRING); + break; + case BOOLEAN: + builder.setAtomicType(SchemaApi.AtomicType.BOOLEAN); + break; + case BYTES: + builder.setAtomicType(SchemaApi.AtomicType.BYTES); break; } builder.setNullable(fieldType.getNullable()); return builder.build(); } - public static Schema fromProto(RunnerApi.Schema protoSchema) { + public static Schema fromProto(SchemaApi.Schema protoSchema) { Schema.Builder builder = Schema.builder(); Map encodingLocationMap = Maps.newHashMap(); - for (RunnerApi.Schema.Field protoField : protoSchema.getFieldsList()) { + for (SchemaApi.Field protoField : protoSchema.getFieldsList()) { Field field = fieldFromProto(protoField); builder.addField(field); encodingLocationMap.put(protoField.getName(), protoField.getEncodingPosition()); @@ -144,52 +152,72 @@ public static Schema fromProto(RunnerApi.Schema protoSchema) { return schema; } - private static Field fieldFromProto(RunnerApi.Schema.Field protoField) { + private static Field fieldFromProto(SchemaApi.Field protoField) { return Field.of(protoField.getName(), fieldTypeFromProto(protoField.getType())) .withDescription(protoField.getDescription()); } - private static FieldType fieldTypeFromProto(RunnerApi.Schema.FieldType protoFieldType) { - FieldType fieldType; + private static FieldType fieldTypeFromProto(SchemaApi.FieldType protoFieldType) { + FieldType fieldType = fieldTypeFromProtoWithoutNullable(protoFieldType); + + if (protoFieldType.getNullable()) { + fieldType = fieldType.withNullable(true); + } + + return fieldType; + } + + private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType protoFieldType) { switch (protoFieldType.getTypeInfoCase()) { case ATOMIC_TYPE: - TypeName typeName = ATOMIC_TYPE_MAPPING.inverse().get(protoFieldType.getAtomicType()); - fieldType = FieldType.of(typeName); - break; + switch (protoFieldType.getAtomicType()) { + case BYTE: + return FieldType.of(TypeName.BYTE); + case INT16: + return FieldType.of(TypeName.INT16); + case INT32: + return FieldType.of(TypeName.INT32); + case INT64: + return FieldType.of(TypeName.INT64); + case FLOAT: + return FieldType.of(TypeName.FLOAT); + case DOUBLE: + return FieldType.of(TypeName.DOUBLE); + case STRING: + return FieldType.of(TypeName.STRING); + case BOOLEAN: + return FieldType.of(TypeName.BOOLEAN); + case BYTES: + return FieldType.of(TypeName.BYTES); + case UNSPECIFIED: + throw new IllegalArgumentException("Encountered UNSPECIFIED AtomicType"); + default: + throw new IllegalArgumentException( + "Encountered unknown AtomicType: " + protoFieldType.getAtomicType()); + } case ROW_TYPE: - fieldType = FieldType.row(fromProto(protoFieldType.getRowType().getSchema())); - break; + return FieldType.row(fromProto(protoFieldType.getRowType().getSchema())); case ARRAY_TYPE: - fieldType = - FieldType.array(fieldTypeFromProto(protoFieldType.getArrayType().getElementType())); - break; + return FieldType.array(fieldTypeFromProto(protoFieldType.getArrayType().getElementType())); case MAP_TYPE: - fieldType = - FieldType.map( - fieldTypeFromProto(protoFieldType.getMapType().getKeyType()), - fieldTypeFromProto(protoFieldType.getMapType().getValueType())); - break; + return FieldType.map( + fieldTypeFromProto(protoFieldType.getMapType().getKeyType()), + fieldTypeFromProto(protoFieldType.getMapType().getValueType())); case LOGICAL_TYPE: // Special-case for DATETIME and DECIMAL which are logical types in portable representation, // but not yet in Java. (BEAM-7554) String urn = protoFieldType.getLogicalType().getUrn(); if (urn.equals(URN_BEAM_LOGICAL_DATETIME)) { - fieldType = FieldType.DATETIME; + return FieldType.DATETIME; } else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) { - fieldType = FieldType.DECIMAL; + return FieldType.DECIMAL; } else { // TODO: Look up logical type class by URN. throw new IllegalArgumentException("Decoding logical types is not yet supported."); } - break; default: throw new IllegalArgumentException( "Unexpected type_info: " + protoFieldType.getTypeInfoCase()); } - - if (protoFieldType.getNullable()) { - fieldType = fieldType.withNullable(true); - } - return fieldType; } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java index 5293cfe9f783..2502d0b8b668 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java @@ -20,7 +20,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; -import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -80,7 +80,7 @@ public static Iterable data() { @Test public void toAndFromProto() throws Exception { - RunnerApi.Schema schemaProto = SchemaTranslation.toProto(schema); + SchemaApi.Schema schemaProto = SchemaTranslation.toProto(schema); Schema decodedSchema = SchemaTranslation.fromProto(schemaProto); assertThat(decodedSchema, equalTo(schema)); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java index 2395f12a2555..af73fd04570a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.dataflow.util; import java.io.IOException; -import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.runners.core.construction.SchemaTranslation; import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.sdk.schemas.Schema; @@ -72,8 +72,8 @@ public SchemaCoder fromCloudObject(CloudObject cloudObject) { StringUtils.jsonStringToByteArray( Structs.getString(cloudObject, FROM_ROW_FUNCTION)), "fromRowFunction"); - RunnerApi.Schema protoSchema = - RunnerApi.Schema.parseFrom( + SchemaApi.Schema protoSchema = + SchemaApi.Schema.parseFrom( StringUtils.jsonStringToByteArray(Structs.getString(cloudObject, SCHEMA))); Schema schema = SchemaTranslation.fromProto(protoSchema); return SchemaCoder.of(schema, toRowFunction, fromRowFunction); From 22f38527799f0863d74ee3d1e92011be2fd72c28 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 17 Sep 2019 18:01:06 -0700 Subject: [PATCH 3/4] Support all logical types by encoding serialized LogicalType instance --- model/pipeline/src/main/proto/schema.proto | 2 +- .../core/construction/SchemaTranslation.java | 20 +++++++++++++++---- .../construction/SchemaTranslationTest.java | 6 +++--- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/model/pipeline/src/main/proto/schema.proto b/model/pipeline/src/main/proto/schema.proto index 42629ebbcdc3..e420e3c91bac 100644 --- a/model/pipeline/src/main/proto/schema.proto +++ b/model/pipeline/src/main/proto/schema.proto @@ -80,6 +80,6 @@ message RowType { message LogicalType { string urn = 1; - string args = 2; + bytes payload = 2; FieldType representation = 3; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java index e07a20a39d53..677e480e7085 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java @@ -25,12 +25,16 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; /** Utility methods for translating schemas. */ public class SchemaTranslation { + private static final String URN_BEAM_LOGICAL_DATETIME = "beam:fieldtype:datetime"; private static final String URN_BEAM_LOGICAL_DECIMAL = "beam:fieldtype:decimal"; + private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:fieldtype:javasdk"; public static SchemaApi.Schema toProto(Schema schema) { String uuid = schema.getUUID() != null ? schema.getUUID().toString() : ""; @@ -82,8 +86,12 @@ private static SchemaApi.FieldType toProto(FieldType fieldType) { LogicalType logicalType = fieldType.getLogicalType(); builder.setLogicalType( SchemaApi.LogicalType.newBuilder() - .setUrn(logicalType.getIdentifier()) - .setArgs(logicalType.getArgument()) + // TODO(BEAM-7855): "javasdk" types should only be a last resort. Types defined in + // Beam should have their own URN, and there should be a mechanism for users to + // register their own types by URN. + .setUrn(URN_BEAM_LOGICAL_JAVASDK) + .setPayload( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(logicalType))) .setRepresentation(toProto(logicalType.getBaseType())) .build()); break; @@ -211,9 +219,13 @@ private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType p return FieldType.DATETIME; } else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) { return FieldType.DECIMAL; + } else if (urn.equals(URN_BEAM_LOGICAL_JAVASDK)) { + return FieldType.logicalType( + (LogicalType) + SerializableUtils.deserializeFromByteArray( + protoFieldType.getLogicalType().getPayload().toByteArray(), "logicalType")); } else { - // TODO: Look up logical type class by URN. - throw new IllegalArgumentException("Decoding logical types is not yet supported."); + throw new IllegalArgumentException("Encountered unsupported logical type URN: " + urn); } default: throw new IllegalArgumentException( diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java index 2502d0b8b668..1c827606c840 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertThat; import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.sdk.schemas.LogicalTypes; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -69,9 +70,8 @@ public static Iterable data() { .add( Schema.of( Field.of("decimal", FieldType.DECIMAL), Field.of("datetime", FieldType.DATETIME))) - // Test for when Logical types are supported - // .add(Schema.of(Field.of("logical", - // FieldType.logicalType(LogicalTypes.FixedBytes.of(24))))) + .add( + Schema.of(Field.of("logical", FieldType.logicalType(LogicalTypes.FixedBytes.of(24))))) .build(); } From 7f53ee51cb3e167377b39aaac81987711c99a241 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Thu, 19 Sep 2019 10:33:06 -0700 Subject: [PATCH 4/4] adjust logical type urns, make toProto function names more descriptive --- .../core/construction/SchemaTranslation.java | 30 +++++++++---------- .../construction/SchemaTranslationTest.java | 2 +- .../SchemaCoderCloudObjectTranslator.java | 2 +- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java index 677e480e7085..6d6eb572480f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java @@ -32,16 +32,16 @@ /** Utility methods for translating schemas. */ public class SchemaTranslation { - private static final String URN_BEAM_LOGICAL_DATETIME = "beam:fieldtype:datetime"; - private static final String URN_BEAM_LOGICAL_DECIMAL = "beam:fieldtype:decimal"; - private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:fieldtype:javasdk"; + private static final String URN_BEAM_LOGICAL_DATETIME = "beam:logical_type:datetime:v1"; + private static final String URN_BEAM_LOGICAL_DECIMAL = "beam:logical_type:decimal:v1"; + private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1"; - public static SchemaApi.Schema toProto(Schema schema) { + public static SchemaApi.Schema schemaToProto(Schema schema) { String uuid = schema.getUUID() != null ? schema.getUUID().toString() : ""; SchemaApi.Schema.Builder builder = SchemaApi.Schema.newBuilder().setId(uuid); for (Field field : schema.getFields()) { SchemaApi.Field protoField = - toProto( + fieldToProto( field, schema.indexOf(field.getName()), schema.getEncodingPositions().get(field.getName())); @@ -50,35 +50,35 @@ public static SchemaApi.Schema toProto(Schema schema) { return builder.build(); } - private static SchemaApi.Field toProto(Field field, int fieldId, int position) { + private static SchemaApi.Field fieldToProto(Field field, int fieldId, int position) { return SchemaApi.Field.newBuilder() .setName(field.getName()) .setDescription(field.getDescription()) - .setType(toProto(field.getType())) + .setType(fieldTypeToProto(field.getType())) .setId(fieldId) .setEncodingPosition(position) .build(); } - private static SchemaApi.FieldType toProto(FieldType fieldType) { + private static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType) { SchemaApi.FieldType.Builder builder = SchemaApi.FieldType.newBuilder(); switch (fieldType.getTypeName()) { case ROW: builder.setRowType( - SchemaApi.RowType.newBuilder().setSchema(toProto(fieldType.getRowSchema()))); + SchemaApi.RowType.newBuilder().setSchema(schemaToProto(fieldType.getRowSchema()))); break; case ARRAY: builder.setArrayType( SchemaApi.ArrayType.newBuilder() - .setElementType(toProto(fieldType.getCollectionElementType()))); + .setElementType(fieldTypeToProto(fieldType.getCollectionElementType()))); break; case MAP: builder.setMapType( SchemaApi.MapType.newBuilder() - .setKeyType(toProto(fieldType.getMapKeyType())) - .setValueType(toProto(fieldType.getMapValueType())) + .setKeyType(fieldTypeToProto(fieldType.getMapKeyType())) + .setValueType(fieldTypeToProto(fieldType.getMapValueType())) .build()); break; @@ -92,7 +92,7 @@ private static SchemaApi.FieldType toProto(FieldType fieldType) { .setUrn(URN_BEAM_LOGICAL_JAVASDK) .setPayload( ByteString.copyFrom(SerializableUtils.serializeToByteArray(logicalType))) - .setRepresentation(toProto(logicalType.getBaseType())) + .setRepresentation(fieldTypeToProto(logicalType.getBaseType())) .build()); break; // Special-case for DATETIME and DECIMAL which are logical types in portable representation, @@ -101,14 +101,14 @@ private static SchemaApi.FieldType toProto(FieldType fieldType) { builder.setLogicalType( SchemaApi.LogicalType.newBuilder() .setUrn(URN_BEAM_LOGICAL_DATETIME) - .setRepresentation(toProto(FieldType.INT64)) + .setRepresentation(fieldTypeToProto(FieldType.INT64)) .build()); break; case DECIMAL: builder.setLogicalType( SchemaApi.LogicalType.newBuilder() .setUrn(URN_BEAM_LOGICAL_DECIMAL) - .setRepresentation(toProto(FieldType.BYTES)) + .setRepresentation(fieldTypeToProto(FieldType.BYTES)) .build()); break; case BYTE: diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java index 1c827606c840..20208146843e 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SchemaTranslationTest.java @@ -80,7 +80,7 @@ public static Iterable data() { @Test public void toAndFromProto() throws Exception { - SchemaApi.Schema schemaProto = SchemaTranslation.toProto(schema); + SchemaApi.Schema schemaProto = SchemaTranslation.schemaToProto(schema); Schema decodedSchema = SchemaTranslation.fromProto(schemaProto); assertThat(decodedSchema, equalTo(schema)); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java index af73fd04570a..2ff8c771f79c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java @@ -52,7 +52,7 @@ public CloudObject toCloudObject(SchemaCoder target, SdkComponents sdkComponents base, SCHEMA, StringUtils.byteArrayToJsonString( - SchemaTranslation.toProto(target.getSchema()).toByteArray())); + SchemaTranslation.schemaToProto(target.getSchema()).toByteArray())); return base; }