Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -647,43 +647,45 @@ message StandardCoders {

// Experimental: A representation of a Beam Schema.
message Schema {
enum TypeName {
BYTE = 0;
INT16 = 1;
INT32 = 2;
INT64 = 3;
DECIMAL = 4;
enum AtomicType {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't 0 in an enum supposed to be reserved for unknown? It is wise, because of defaulting in proto libs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to my knowledge specifically adding an UNSPECIFIED with a value of 0 will make this clearer.
For example:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

UNSPECIFIED = 0;
BYTE = 1;
INT16 = 2;
INT32 = 3;
INT64 = 4;
FLOAT = 5;
DOUBLE = 6;
STRING = 7;
DATETIME = 8;
BOOLEAN = 9;
BYTES = 10;
ARRAY = 11;
MAP = 13;
ROW = 14;
LOGICAL_TYPE = 15;
BOOLEAN = 8;
BYTES = 9;
}

message LogicalType {
string id = 1;
string args = 2;
FieldType base_type = 3;
bytes serialized_class = 4;
message ArrayType {
FieldType element_type = 1;
}

message MapType {
FieldType key_type = 1;
FieldType value_type = 2;
}

message RowType {
Schema schema = 1;
}

message LogicalType {
string urn = 1;
string args = 2;
FieldType representation = 3;
}

message FieldType {
TypeName type_name = 1;
bool nullable = 2;
bool nullable = 1;
oneof type_info {
FieldType collection_element_type = 3;
AtomicType atomic_type = 2;
ArrayType array_type = 3;
MapType map_type = 4;
Schema row_schema = 5;
RowType row_type = 5;
LogicalType logical_type = 6;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,29 @@
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;

/** Utility methods for translating schemas. */
public class SchemaTranslation {
private static final BiMap<TypeName, RunnerApi.Schema.TypeName> TYPE_NAME_MAPPING =
ImmutableBiMap.<TypeName, RunnerApi.Schema.TypeName>builder()
.put(TypeName.BYTE, RunnerApi.Schema.TypeName.BYTE)
.put(TypeName.INT16, RunnerApi.Schema.TypeName.INT16)
.put(TypeName.INT32, RunnerApi.Schema.TypeName.INT32)
.put(TypeName.INT64, RunnerApi.Schema.TypeName.INT64)
.put(TypeName.DECIMAL, RunnerApi.Schema.TypeName.DECIMAL)
.put(TypeName.FLOAT, RunnerApi.Schema.TypeName.FLOAT)
.put(TypeName.DOUBLE, RunnerApi.Schema.TypeName.DOUBLE)
.put(TypeName.STRING, RunnerApi.Schema.TypeName.STRING)
.put(TypeName.DATETIME, RunnerApi.Schema.TypeName.DATETIME)
.put(TypeName.BOOLEAN, RunnerApi.Schema.TypeName.BOOLEAN)
.put(TypeName.BYTES, RunnerApi.Schema.TypeName.BYTES)
.put(TypeName.ARRAY, RunnerApi.Schema.TypeName.ARRAY)
.put(TypeName.MAP, RunnerApi.Schema.TypeName.MAP)
.put(TypeName.ROW, RunnerApi.Schema.TypeName.ROW)
.put(TypeName.LOGICAL_TYPE, RunnerApi.Schema.TypeName.LOGICAL_TYPE)

private static final BiMap<TypeName, RunnerApi.Schema.AtomicType> ATOMIC_TYPE_MAPPING =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I find a switch clearer than a map lookup, and it takes the same amount of code space. Not for this PR, in which you are just editing the existing structure not restructuring.

ImmutableBiMap.<TypeName, RunnerApi.Schema.AtomicType>builder()
.put(TypeName.BYTE, RunnerApi.Schema.AtomicType.BYTE)
.put(TypeName.INT16, RunnerApi.Schema.AtomicType.INT16)
.put(TypeName.INT32, RunnerApi.Schema.AtomicType.INT32)
.put(TypeName.INT64, RunnerApi.Schema.AtomicType.INT64)
.put(TypeName.FLOAT, RunnerApi.Schema.AtomicType.FLOAT)
.put(TypeName.DOUBLE, RunnerApi.Schema.AtomicType.DOUBLE)
.put(TypeName.STRING, RunnerApi.Schema.AtomicType.STRING)
.put(TypeName.BOOLEAN, RunnerApi.Schema.AtomicType.BOOLEAN)
.put(TypeName.BYTES, RunnerApi.Schema.AtomicType.BYTES)
.build();

private static final String URN_BEAM_LOGICAL_DATETIME = "beam:fieldtype:datetime";
private static final String URN_BEAM_LOGICAL_DECIMAL = "beam:fieldtype:decimal";

public static RunnerApi.Schema toProto(Schema schema) {
String uuid = schema.getUUID() != null ? schema.getUUID().toString() : "";
RunnerApi.Schema.Builder builder = RunnerApi.Schema.newBuilder().setId(uuid);
Expand All @@ -77,16 +73,17 @@ private static RunnerApi.Schema.Field toProto(Field field, int fieldId, int posi
}

private static RunnerApi.Schema.FieldType toProto(FieldType fieldType) {
RunnerApi.Schema.FieldType.Builder builder =
RunnerApi.Schema.FieldType.newBuilder()
.setTypeName(TYPE_NAME_MAPPING.get(fieldType.getTypeName()));
RunnerApi.Schema.FieldType.Builder builder = RunnerApi.Schema.FieldType.newBuilder();
switch (fieldType.getTypeName()) {
case ROW:
builder.setRowSchema(toProto(fieldType.getRowSchema()));
builder.setRowType(
RunnerApi.Schema.RowType.newBuilder().setSchema(toProto(fieldType.getRowSchema())));
break;

case ARRAY:
builder.setCollectionElementType(toProto(fieldType.getCollectionElementType()));
builder.setArrayType(
RunnerApi.Schema.ArrayType.newBuilder()
.setElementType(toProto(fieldType.getCollectionElementType())));
break;

case MAP:
Expand All @@ -101,15 +98,29 @@ private static RunnerApi.Schema.FieldType toProto(FieldType fieldType) {
LogicalType logicalType = fieldType.getLogicalType();
builder.setLogicalType(
RunnerApi.Schema.LogicalType.newBuilder()
.setId(logicalType.getIdentifier())
.setUrn(logicalType.getIdentifier())
.setArgs(logicalType.getArgument())
.setBaseType(toProto(logicalType.getBaseType()))
.setSerializedClass(
ByteString.copyFrom(SerializableUtils.serializeToByteArray(logicalType)))
.setRepresentation(toProto(logicalType.getBaseType()))
.build());
break;
// Special-case for DATETIME and DECIMAL which are logical types in portable representation,
// but not yet in Java. (BEAM-7554)
case DATETIME:
builder.setLogicalType(
RunnerApi.Schema.LogicalType.newBuilder()
.setUrn(URN_BEAM_LOGICAL_DATETIME)
.setRepresentation(toProto(FieldType.INT64))
.build());
break;
case DECIMAL:
builder.setLogicalType(
RunnerApi.Schema.LogicalType.newBuilder()
.setUrn(URN_BEAM_LOGICAL_DECIMAL)
.setRepresentation(toProto(FieldType.BYTES))
.build());
break;

default:
builder.setAtomicType(ATOMIC_TYPE_MAPPING.get(fieldType.getTypeName()));
break;
}
builder.setNullable(fieldType.getNullable());
Expand Down Expand Up @@ -139,32 +150,43 @@ private static Field fieldFromProto(RunnerApi.Schema.Field protoField) {
}

private static FieldType fieldTypeFromProto(RunnerApi.Schema.FieldType protoFieldType) {
TypeName typeName = TYPE_NAME_MAPPING.inverse().get(protoFieldType.getTypeName());
FieldType fieldType;
switch (typeName) {
case ROW:
fieldType = FieldType.row(fromProto(protoFieldType.getRowSchema()));
switch (protoFieldType.getTypeInfoCase()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another not-for-this PR comment that this would be cleaner with the switch in a function so the branches could all return.

case ATOMIC_TYPE:
TypeName typeName = ATOMIC_TYPE_MAPPING.inverse().get(protoFieldType.getAtomicType());
fieldType = FieldType.of(typeName);
break;
case ARRAY:
fieldType = FieldType.array(fieldTypeFromProto(protoFieldType.getCollectionElementType()));
case ROW_TYPE:
fieldType = FieldType.row(fromProto(protoFieldType.getRowType().getSchema()));
break;
case MAP:
case ARRAY_TYPE:
fieldType =
FieldType.array(fieldTypeFromProto(protoFieldType.getArrayType().getElementType()));
break;
case MAP_TYPE:
fieldType =
FieldType.map(
fieldTypeFromProto(protoFieldType.getMapType().getKeyType()),
fieldTypeFromProto(protoFieldType.getMapType().getValueType()));
break;
case LOGICAL_TYPE:
LogicalType logicalType =
(LogicalType)
SerializableUtils.deserializeFromByteArray(
protoFieldType.getLogicalType().getSerializedClass().toByteArray(),
"logicalType");
fieldType = FieldType.logicalType(logicalType);
// Special-case for DATETIME and DECIMAL which are logical types in portable representation,
// but not yet in Java. (BEAM-7554)
String urn = protoFieldType.getLogicalType().getUrn();
if (urn.equals(URN_BEAM_LOGICAL_DATETIME)) {
fieldType = FieldType.DATETIME;
} else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) {
fieldType = FieldType.DECIMAL;
} else {
// TODO: Look up logical type class by URN.
throw new IllegalArgumentException("Decoding logical types is not yet supported.");
}
break;
default:
fieldType = FieldType.of(typeName);
throw new IllegalArgumentException(
"Unexpected type_info: " + protoFieldType.getTypeInfoCase());
}

if (protoFieldType.getNullable()) {
fieldType = fieldType.withNullable(true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

/** Tests for {@link SchemaTranslation}. */
public class SchemaTranslationTest {

/** Tests round-trip proto encodings for {@link Schema}. */
@RunWith(Parameterized.class)
public static class ToFromProtoTest {
@Parameters(name = "{index}: {0}")
public static Iterable<Schema> data() {
return ImmutableList.<Schema>builder()
.add(Schema.of(Field.of("string", FieldType.STRING)))
.add(
Schema.of(
Field.of("boolean", FieldType.BOOLEAN),
Field.of("byte", FieldType.BYTE),
Field.of("int16", FieldType.INT16),
Field.of("int32", FieldType.INT32),
Field.of("int64", FieldType.INT64)))
.add(
Schema.of(
Field.of(
"row",
FieldType.row(
Schema.of(
Field.of("foo", FieldType.STRING),
Field.of("bar", FieldType.DOUBLE),
Field.of("baz", FieldType.BOOLEAN))))))
.add(
Schema.of(
Field.of(
"array(array(int64)))",
FieldType.array(FieldType.array(FieldType.INT64.withNullable(true))))))
.add(
Schema.of(
Field.of("nullable", FieldType.STRING.withNullable(true)),
Field.of("non_nullable", FieldType.STRING.withNullable(false))))
.add(
Schema.of(
Field.of("decimal", FieldType.DECIMAL), Field.of("datetime", FieldType.DATETIME)))
// Test for when Logical types are supported
// .add(Schema.of(Field.of("logical",
// FieldType.logicalType(LogicalTypes.FixedBytes.of(24)))))
.build();
}

@Parameter(0)
public Schema schema;

@Test
public void toAndFromProto() throws Exception {
RunnerApi.Schema schemaProto = SchemaTranslation.toProto(schema);

Schema decodedSchema = SchemaTranslation.fromProto(schemaProto);
assertThat(decodedSchema, equalTo(schema));
}
}
}