diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/SchemaUtils.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/SchemaUtils.java index ea2a2bf3f3bb..bc1562e0f630 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/SchemaUtils.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/SchemaUtils.java @@ -27,29 +27,30 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; /** Utils to convert between HCatalog schema types and Beam schema types. */ @Experimental(Kind.SCHEMAS) class SchemaUtils { - private static final Map PRIMITIVE_SERDE_TYPES_MAP = - ImmutableMap.builder() - .put(serdeConstants.BINARY_TYPE_NAME, FieldType.BYTES) - .put(serdeConstants.BOOLEAN_TYPE_NAME, FieldType.BOOLEAN) - .put(serdeConstants.TINYINT_TYPE_NAME, FieldType.BYTE) - .put(serdeConstants.CHAR_TYPE_NAME, FieldType.STRING) - .put(serdeConstants.DATE_TYPE_NAME, FieldType.DATETIME) - .put(serdeConstants.DATETIME_TYPE_NAME, FieldType.DATETIME) - .put(serdeConstants.DECIMAL_TYPE_NAME, FieldType.DECIMAL) - .put(serdeConstants.DOUBLE_TYPE_NAME, FieldType.DOUBLE) - .put(serdeConstants.FLOAT_TYPE_NAME, FieldType.FLOAT) - .put(serdeConstants.INT_TYPE_NAME, FieldType.INT32) - .put(serdeConstants.BIGINT_TYPE_NAME, FieldType.INT64) - .put(serdeConstants.SMALLINT_TYPE_NAME, FieldType.INT16) - .put(serdeConstants.STRING_TYPE_NAME, FieldType.STRING) - .put(serdeConstants.TIMESTAMP_TYPE_NAME, FieldType.DATETIME) - .put(serdeConstants.VARCHAR_TYPE_NAME, FieldType.STRING) + private static final Map HCAT_TO_BEAM_TYPES_MAP = + ImmutableMap.builder() + .put(HCatFieldSchema.Type.BOOLEAN, FieldType.BOOLEAN) + .put(HCatFieldSchema.Type.TINYINT, FieldType.BYTE) + .put(HCatFieldSchema.Type.SMALLINT, FieldType.INT16) + .put(HCatFieldSchema.Type.INT, FieldType.INT32) + .put(HCatFieldSchema.Type.BIGINT, FieldType.INT64) + .put(HCatFieldSchema.Type.FLOAT, FieldType.FLOAT) + .put(HCatFieldSchema.Type.DOUBLE, FieldType.DOUBLE) + .put(HCatFieldSchema.Type.DECIMAL, FieldType.DECIMAL) + .put(HCatFieldSchema.Type.STRING, FieldType.STRING) + .put(HCatFieldSchema.Type.CHAR, FieldType.STRING) + .put(HCatFieldSchema.Type.VARCHAR, FieldType.STRING) + .put(HCatFieldSchema.Type.BINARY, FieldType.BYTES) + .put(HCatFieldSchema.Type.DATE, FieldType.DATETIME) + .put(HCatFieldSchema.Type.TIMESTAMP, FieldType.DATETIME) .build(); static Schema toBeamSchema(List fields) { @@ -58,12 +59,35 @@ static Schema toBeamSchema(List fields) { private static Schema.Field toBeamField(FieldSchema field) { String name = field.getName(); - if (!PRIMITIVE_SERDE_TYPES_MAP.containsKey(field.getType())) { + HCatFieldSchema hCatFieldSchema; + + try { + hCatFieldSchema = HCatSchemaUtils.getHCatFieldSchema(field); + } catch (HCatException e) { + // Converting checked Exception to unchecked Exception. throw new UnsupportedOperationException( - "The type '" + field.getType() + "' of field '" + name + "' is not supported."); + "Error while converting FieldSchema to HCatFieldSchema", e); } - FieldType fieldType = PRIMITIVE_SERDE_TYPES_MAP.get(field.getType()); - return Schema.Field.of(name, fieldType).withNullable(true); + switch (hCatFieldSchema.getCategory()) { + case PRIMITIVE: + { + if (!HCAT_TO_BEAM_TYPES_MAP.containsKey(hCatFieldSchema.getType())) { + throw new UnsupportedOperationException( + "The Primitive HCat type '" + + field.getType() + + "' of field '" + + name + + "' cannot be converted to Beam FieldType"); + } + + FieldType fieldType = HCAT_TO_BEAM_TYPES_MAP.get(hCatFieldSchema.getType()); + return Schema.Field.of(name, fieldType).withNullable(true); + } + // TODO: Add Support for Complex Types i.e. ARRAY, MAP, STRUCT + default: + throw new UnsupportedOperationException( + "The category '" + hCatFieldSchema.getCategory() + "' is not supported."); + } } } diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/SchemaUtilsTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/SchemaUtilsTest.java new file mode 100644 index 000000000000..5b748da86773 --- /dev/null +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/SchemaUtilsTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hcatalog; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.junit.Assert; +import org.junit.Test; + +public class SchemaUtilsTest { + @Test + public void testParameterizedTypesToBeamTypes() { + List listOfFieldSchema = new ArrayList<>(); + listOfFieldSchema.add(new FieldSchema("parameterizedChar", "char(10)", null)); + listOfFieldSchema.add(new FieldSchema("parameterizedVarchar", "varchar(100)", null)); + listOfFieldSchema.add(new FieldSchema("parameterizedDecimal", "decimal(30,16)", null)); + + Schema expectedSchema = + Schema.builder() + .addNullableField("parameterizedChar", Schema.FieldType.STRING) + .addNullableField("parameterizedVarchar", Schema.FieldType.STRING) + .addNullableField("parameterizedDecimal", Schema.FieldType.DECIMAL) + .build(); + + Schema actualSchema = SchemaUtils.toBeamSchema(listOfFieldSchema); + Assert.assertEquals(expectedSchema, actualSchema); + } +}