Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions model/pipeline/src/main/proto/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ option java_outer_classname = "SchemaApi";
message Schema {
repeated Field fields = 1;
string id = 2;
repeated Option options = 3;
}

message Field {
Expand All @@ -39,6 +40,7 @@ message Field {
FieldType type = 3;
int32 id = 4;
int32 encoding_position = 5;
repeated Option options = 6;
}

message FieldType {
Expand Down Expand Up @@ -91,6 +93,12 @@ message LogicalType {
FieldValue argument = 5;
}

message Option {
string name = 1;
FieldType type = 2;
FieldValue value = 3;
}

message Row {
repeated FieldValue values = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public RowCoder fromComponents(List<Coder<?>> components, byte[] payload) {
components.isEmpty(), "Expected empty component list, but received: " + components);
Schema schema;
try {
schema = SchemaTranslation.fromProto(SchemaApi.Schema.parseFrom(payload));
schema = SchemaTranslation.schemaFromProto(SchemaApi.Schema.parseFrom(payload));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Unable to parse schema for RowCoder: ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ private static Object convertValue(Object value, CommonCoder coderSpec, Coder co
} else if (s.equals(getUrn(StandardCoders.Enum.ROW))) {
Schema schema;
try {
schema = SchemaTranslation.fromProto(SchemaApi.Schema.parseFrom(coderSpec.getPayload()));
schema =
SchemaTranslation.schemaFromProto(SchemaApi.Schema.parseFrom(coderSpec.getPayload()));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Failed to parse schema payload for row coder", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -41,6 +46,37 @@ public class SchemaTranslationTest {
public static class ToFromProtoTest {
@Parameters(name = "{index}: {0}")
public static Iterable<Schema> data() {
Map<String, Integer> optionMap = new HashMap<>();
optionMap.put("string", 42);
List<String> optionList = new ArrayList<>();
optionList.add("string");
Row optionRow =
Row.withSchema(
Schema.builder()
.addField("field_one", FieldType.STRING)
.addField("field_two", FieldType.INT32)
.build())
.addValue("value")
.addValue(42)
.build();

Schema.Options.Builder optionsBuilder =
Schema.Options.builder()
.setOption("field_option_boolean", FieldType.BOOLEAN, true)
.setOption("field_option_byte", FieldType.BYTE, (byte) 12)
.setOption("field_option_int16", FieldType.INT16, (short) 12)
.setOption("field_option_int32", FieldType.INT32, 12)
.setOption("field_option_int64", FieldType.INT64, 12L)
.setOption("field_option_string", FieldType.STRING, "foo")
.setOption("field_option_bytes", FieldType.BYTES, new byte[] {0x42, 0x69, 0x00})
.setOption("field_option_float", FieldType.FLOAT, (float) 12.0)
.setOption("field_option_double", FieldType.DOUBLE, 12.0)
.setOption(
"field_option_map", FieldType.map(FieldType.STRING, FieldType.INT32), optionMap)
.setOption("field_option_array", FieldType.array(FieldType.STRING), optionList)
.setRowOption("field_option_row", optionRow)
.setOption("field_option_value", FieldType.STRING, "other");

return ImmutableList.<Schema>builder()
.add(Schema.of(Field.of("string", FieldType.STRING)))
.add(
Expand Down Expand Up @@ -77,6 +113,49 @@ public static Iterable<Schema> data() {
Schema.of(
Field.of("decimal", FieldType.DECIMAL), Field.of("datetime", FieldType.DATETIME)))
.add(Schema.of(Field.of("logical", FieldType.logicalType(FixedBytes.of(24)))))
.add(
Schema.of(
Field.of("field_with_option_atomic", FieldType.STRING)
.withOptions(
Schema.Options.builder()
.setOption(
"field_option_atomic", FieldType.INT32, Integer.valueOf(42))
.build()))
.withOptions(
Schema.Options.builder()
.setOption("schema_option_atomic", FieldType.BOOLEAN, true)))
.add(
Schema.of(
Field.of("field_with_option_map", FieldType.STRING)
.withOptions(
Schema.Options.builder()
.setOption(
"field_option_map",
FieldType.map(FieldType.STRING, FieldType.INT32),
optionMap)))
.withOptions(
Schema.Options.builder()
.setOption(
"field_option_map",
FieldType.map(FieldType.STRING, FieldType.INT32),
optionMap)))
.add(
Schema.of(
Field.of("field_with_option_array", FieldType.STRING)
.withOptions(
Schema.Options.builder()
.setOption(
"field_option_array",
FieldType.array(FieldType.STRING),
optionList)
.build()))
.withOptions(
Schema.Options.builder()
.setOption(
"field_option_array", FieldType.array(FieldType.STRING), optionList)))
.add(
Schema.of(Field.of("field", FieldType.STRING).withOptions(optionsBuilder))
.withOptions(optionsBuilder))
.build();
}

Expand All @@ -87,7 +166,7 @@ public static Iterable<Schema> data() {
public void toAndFromProto() throws Exception {
SchemaApi.Schema schemaProto = SchemaTranslation.schemaToProto(schema, true);

Schema decodedSchema = SchemaTranslation.fromProto(schemaProto);
Schema decodedSchema = SchemaTranslation.schemaFromProto(schemaProto);
assertThat(decodedSchema, equalTo(schema));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public SchemaCoder fromCloudObject(CloudObject cloudObject) {
SchemaApi.Schema protoSchema =
SchemaApi.Schema.parseFrom(
StringUtils.jsonStringToByteArray(Structs.getString(cloudObject, SCHEMA)));
Schema schema = SchemaTranslation.fromProto(protoSchema);
Schema schema = SchemaTranslation.schemaFromProto(protoSchema);
return SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Loading