diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java index 7c8bf6be3af2..24ba9f300c5f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java @@ -71,7 +71,7 @@ public FormatWriterFactory createWriterFactory(RowType type) { @Override public void validateDataFields(RowType rowType) { - ParquetSchemaConverter.convertToParquetMessageType("paimon_schema", rowType); + ParquetSchemaConverter.convertToParquetMessageType(rowType); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index dac8940f9078..bd1ea5f8ca2a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -43,6 +43,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Pool; import org.apache.parquet.ParquetReadOptions; @@ -70,11 +71,10 @@ import java.util.List; import java.util.Set; -import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_ELEMENT_NAME; -import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_NAME; -import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_KEY_NAME; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_REPEATED_NAME; -import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_VALUE_NAME; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.PAIMON_SCHEMA; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetListElementType; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetMapKeyValueType; import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.buildFieldsList; import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createColumnReader; import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createWritableColumnVector; @@ -91,20 +91,15 @@ public class ParquetReaderFactory implements FormatReaderFactory { private static final String ALLOCATION_SIZE = "parquet.read.allocation.size"; private final Options conf; - - private final RowType projectedType; - private final String[] projectedColumnNames; - private final DataField[] projectedFields; + private final DataField[] readFields; private final int batchSize; private final FilterCompat.Filter filter; private final Set unknownFieldsIndices = new HashSet<>(); public ParquetReaderFactory( - Options conf, RowType projectedType, int batchSize, FilterCompat.Filter filter) { + Options conf, RowType readType, int batchSize, FilterCompat.Filter filter) { this.conf = conf; - this.projectedType = projectedType; - this.projectedColumnNames = projectedType.getFieldNames().toArray(new String[0]); - this.projectedFields = projectedType.getFields().toArray(new DataField[0]); + this.readFields = readType.getFields().toArray(new DataField[0]); this.batchSize = batchSize; this.filter = filter; } @@ -131,8 +126,7 @@ public FileRecordReader createReader(FormatReaderFactory.Context co createPoolOfBatches(context.filePath(), requestedSchema); MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(requestedSchema); - List fields = - buildFieldsList(projectedType.getFields(), projectedType.getFieldNames(), columnIO); + List fields = buildFieldsList(readFields, columnIO); return new ParquetReader( reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields); @@ -160,24 +154,23 @@ private void setReadOptions(ParquetReadOptions.Builder builder) { /** Clips `parquetSchema` according to `fieldNames`. */ private MessageType clipParquetSchema(GroupType parquetSchema) { - Type[] types = new Type[projectedColumnNames.length]; - for (int i = 0; i < projectedColumnNames.length; ++i) { - String fieldName = projectedColumnNames[i]; + Type[] types = new Type[readFields.length]; + for (int i = 0; i < readFields.length; ++i) { + String fieldName = readFields[i].name(); if (!parquetSchema.containsField(fieldName)) { LOG.warn( "{} does not exist in {}, will fill the field with null.", fieldName, parquetSchema); - types[i] = - ParquetSchemaConverter.convertToParquetType(fieldName, projectedFields[i]); + types[i] = ParquetSchemaConverter.convertToParquetType(readFields[i]); unknownFieldsIndices.add(i); } else { Type parquetType = parquetSchema.getType(fieldName); - types[i] = clipParquetType(projectedFields[i].type(), parquetType); + types[i] = clipParquetType(readFields[i].type(), parquetType); } } - return Types.buildMessage().addFields(types).named("paimon-parquet"); + return Types.buildMessage().addFields(types).named(PAIMON_SCHEMA); } /** Clips `parquetType` by `readType`. */ @@ -201,25 +194,21 @@ private Type clipParquetType(DataType readType, Type parquetType) { case MAP: MapType mapType = (MapType) readType; GroupType mapGroup = (GroupType) parquetType; - GroupType keyValue = mapGroup.getType(MAP_REPEATED_NAME).asGroupType(); + Pair keyValueType = parquetMapKeyValueType(mapGroup); return ConversionPatterns.mapType( mapGroup.getRepetition(), mapGroup.getName(), MAP_REPEATED_NAME, - clipParquetType(mapType.getKeyType(), keyValue.getType(MAP_KEY_NAME)), - keyValue.containsField(MAP_VALUE_NAME) - ? clipParquetType( - mapType.getValueType(), keyValue.getType(MAP_VALUE_NAME)) - : null); + clipParquetType(mapType.getKeyType(), keyValueType.getLeft()), + clipParquetType(mapType.getValueType(), keyValueType.getRight())); case ARRAY: ArrayType arrayType = (ArrayType) readType; GroupType arrayGroup = (GroupType) parquetType; - GroupType list = arrayGroup.getType(LIST_NAME).asGroupType(); return ConversionPatterns.listOfElements( arrayGroup.getRepetition(), arrayGroup.getName(), clipParquetType( - arrayType.getElementType(), list.getType(LIST_ELEMENT_NAME))); + arrayType.getElementType(), parquetListElementType(arrayGroup))); default: return parquetType; } @@ -227,7 +216,7 @@ private Type clipParquetType(DataType readType, Type parquetType) { private void checkSchema(MessageType fileSchema, MessageType requestedSchema) throws IOException, UnsupportedOperationException { - if (projectedColumnNames.length != requestedSchema.getFieldCount()) { + if (readFields.length != requestedSchema.getFieldCount()) { throw new RuntimeException( "The quality of field type is incompatible with the request schema!"); } @@ -275,13 +264,13 @@ private ParquetReaderBatch createReaderBatch( } private WritableColumnVector[] createWritableVectors(MessageType requestedSchema) { - WritableColumnVector[] columns = new WritableColumnVector[projectedFields.length]; + WritableColumnVector[] columns = new WritableColumnVector[readFields.length]; List types = requestedSchema.getFields(); - for (int i = 0; i < projectedFields.length; i++) { + for (int i = 0; i < readFields.length; i++) { columns[i] = createWritableColumnVector( batchSize, - projectedFields[i].type(), + readFields[i].type(), types.get(i), requestedSchema.getColumns(), 0); @@ -297,7 +286,7 @@ private VectorizedColumnBatch createVectorizedColumnBatch( WritableColumnVector[] writableVectors) { ColumnVector[] vectors = new ColumnVector[writableVectors.length]; for (int i = 0; i < writableVectors.length; i++) { - switch (projectedFields[i].type().getTypeRoot()) { + switch (readFields[i].type().getTypeRoot()) { case DECIMAL: vectors[i] = new ParquetDecimalVector( @@ -436,7 +425,7 @@ private void readNextRowGroup() throws IOException { if (!unknownFieldsIndices.contains(i)) { columnReaders[i] = createColumnReader( - projectedFields[i].type(), + readFields[i].type(), types.get(i), requestedSchema.getColumns(), rowGroup, diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java index ba6d4e5e0009..cb9415d50a66 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java @@ -23,6 +23,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.IntType; import org.apache.paimon.types.LocalZonedTimestampType; @@ -30,6 +31,7 @@ import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; +import org.apache.paimon.utils.Pair; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; @@ -39,6 +41,9 @@ import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; +import java.util.List; +import java.util.stream.Collectors; + import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; @@ -46,26 +51,30 @@ /** Schema converter converts Parquet schema to and from Paimon internal types. */ public class ParquetSchemaConverter { + static final String PAIMON_SCHEMA = "paimon_schema"; + static final String MAP_REPEATED_NAME = "key_value"; static final String MAP_KEY_NAME = "key"; static final String MAP_VALUE_NAME = "value"; - static final String LIST_NAME = "list"; + static final String LIST_REPEATED_NAME = "list"; static final String LIST_ELEMENT_NAME = "element"; - public static MessageType convertToParquetMessageType(String name, RowType rowType) { - return new MessageType(name, convertToParquetTypes(rowType)); - } - - public static Type convertToParquetType(String name, DataField field) { - return convertToParquetType(name, field.type(), field.id(), 0); + /** Convert paimon {@link RowType} to parquet {@link MessageType}. */ + public static MessageType convertToParquetMessageType(RowType rowType) { + return new MessageType(PAIMON_SCHEMA, convertToParquetTypes(rowType)); } private static Type[] convertToParquetTypes(RowType rowType) { return rowType.getFields().stream() - .map(f -> convertToParquetType(f.name(), f.type(), f.id(), 0)) + .map(ParquetSchemaConverter::convertToParquetType) .toArray(Type[]::new); } + /** Convert paimon {@link DataField} to parquet {@link Type}. */ + public static Type convertToParquetType(DataField field) { + return convertToParquetType(field.name(), field.type(), field.id(), 0); + } + private static Type convertToParquetType(String name, DataType type, int fieldId, int depth) { Type.Repetition repetition = type.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED; @@ -260,4 +269,144 @@ public static boolean is32BitDecimal(int precision) { public static boolean is64BitDecimal(int precision) { return precision <= 18 && precision > 9; } + + /** Convert parquet {@link MessageType} to paimon {@link RowType}. */ + public static RowType convertToPaimonRowType(MessageType messageType) { + List dataFields = + messageType.asGroupType().getFields().stream() + .map(ParquetSchemaConverter::convertToPaimonField) + .collect(Collectors.toList()); + return new RowType(dataFields); + } + + /** Convert parquet {@link Type} to paimon {@link DataField} to. */ + public static DataField convertToPaimonField(Type parquetType) { + LogicalTypeAnnotation logicalType = parquetType.getLogicalTypeAnnotation(); + DataType paimonDataType; + + if (parquetType.isPrimitive()) { + switch (parquetType.asPrimitiveType().getPrimitiveTypeName()) { + case BINARY: + if (logicalType instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) { + paimonDataType = DataTypes.STRING(); + } else { + paimonDataType = DataTypes.BYTES(); + } + break; + case BOOLEAN: + paimonDataType = DataTypes.BOOLEAN(); + break; + case FLOAT: + paimonDataType = DataTypes.FLOAT(); + break; + case DOUBLE: + paimonDataType = DataTypes.DOUBLE(); + break; + case INT32: + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; + paimonDataType = + new DecimalType(decimalType.getPrecision(), decimalType.getScale()); + } else if (logicalType + instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = + (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; + int bitWidth = intType.getBitWidth(); + if (bitWidth == 8) { + paimonDataType = DataTypes.TINYINT(); + } else if (bitWidth == 16) { + paimonDataType = DataTypes.SMALLINT(); + } else { + paimonDataType = DataTypes.INT(); + } + } else if (logicalType + instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) { + paimonDataType = DataTypes.DATE(); + } else if (logicalType + instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + paimonDataType = DataTypes.TIME(); + } else { + paimonDataType = DataTypes.INT(); + } + break; + case INT64: + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; + paimonDataType = + new DecimalType(decimalType.getPrecision(), decimalType.getScale()); + } else if (logicalType + instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; + int precision = + timestampType + .getUnit() + .equals(LogicalTypeAnnotation.TimeUnit.MILLIS) + ? 3 + : 6; + paimonDataType = + timestampType.isAdjustedToUTC() + ? new LocalZonedTimestampType(precision) + : new TimestampType(precision); + } else { + paimonDataType = DataTypes.BIGINT(); + } + break; + case INT96: + paimonDataType = new TimestampType(9); + break; + case FIXED_LEN_BYTE_ARRAY: + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; + paimonDataType = + new DecimalType(decimalType.getPrecision(), decimalType.getScale()); + break; + default: + throw new UnsupportedOperationException("Unsupported type: " + parquetType); + } + if (parquetType.getRepetition().equals(Type.Repetition.REQUIRED)) { + paimonDataType = paimonDataType.notNull(); + } + return new DataField( + parquetType.getId().intValue(), parquetType.getName(), paimonDataType); + } else { + GroupType groupType = parquetType.asGroupType(); + if (logicalType instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) { + paimonDataType = + new ArrayType( + convertToPaimonField(parquetListElementType(groupType)).type()); + } else if (logicalType instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) { + Pair keyValueType = parquetMapKeyValueType(groupType); + paimonDataType = + new MapType( + // Since parquet does not support nullable key, when converting + // back to Paimon, set as nullable by default. + convertToPaimonField(keyValueType.getLeft()).type().nullable(), + convertToPaimonField(keyValueType.getRight()).type()); + } else { + paimonDataType = + new RowType( + groupType.getFields().stream() + .map(ParquetSchemaConverter::convertToPaimonField) + .collect(Collectors.toList())); + } + } + + if (parquetType.getRepetition().equals(Type.Repetition.REQUIRED)) { + paimonDataType = paimonDataType.notNull(); + } + + return new DataField(parquetType.getId().intValue(), parquetType.getName(), paimonDataType); + } + + public static Type parquetListElementType(GroupType listType) { + return listType.getType(LIST_REPEATED_NAME).asGroupType().getType(LIST_ELEMENT_NAME); + } + + public static Pair parquetMapKeyValueType(GroupType mapType) { + GroupType keyValue = mapType.getType(MAP_REPEATED_NAME).asGroupType(); + return Pair.of(keyValue.getType(MAP_KEY_NAME), keyValue.getType(MAP_VALUE_NAME)); + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java index e85e5086b67a..dda56a92a16c 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java @@ -381,12 +381,10 @@ private static List getAllColumnDescriptorByType( } public static List buildFieldsList( - List children, List fieldNames, MessageColumnIO columnIO) { + DataField[] readFields, MessageColumnIO columnIO) { List list = new ArrayList<>(); - for (int i = 0; i < children.size(); i++) { - list.add( - constructField( - children.get(i), lookupColumnByName(columnIO, fieldNames.get(i)))); + for (DataField readField : readFields) { + list.add(constructField(readField, lookupColumnByName(columnIO, readField.name()))); } return list; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java index 057cc7c325bd..9f3384c57864 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java @@ -55,7 +55,7 @@ protected WriteSupport getWriteSupport(Configuration conf) { private class ParquetWriteSupport extends WriteSupport { - private final MessageType schema = convertToParquetMessageType("paimon_schema", rowType); + private final MessageType schema = convertToParquetMessageType(rowType); private ParquetRowDataWriter writer; diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index c6728e338ffe..c78bda7e18f6 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -534,7 +534,7 @@ public void testConvertToParquetTypeWithId() { .withId(baseId + depthLimit * 2 + 1); Type expected = new MessageType( - "table", + ParquetSchemaConverter.PAIMON_SCHEMA, Types.primitive(INT32, Type.Repetition.OPTIONAL).named("a").withId(0), ConversionPatterns.listOfElements( Type.Repetition.OPTIONAL, @@ -555,7 +555,7 @@ public void testConvertToParquetTypeWithId() { .withId(baseId - depthLimit * 2 - 1), outerMapValueType) .withId(2)); - Type actual = ParquetSchemaConverter.convertToParquetMessageType("table", rowType); + Type actual = ParquetSchemaConverter.convertToParquetMessageType(rowType); assertThat(actual).isEqualTo(expected); } @@ -906,8 +906,7 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou Configuration conf = new Configuration(); conf.setInt("parquet.block.size", rowGroupSize); MessageType schema = - ParquetSchemaConverter.convertToParquetMessageType( - "paimon-parquet", NESTED_ARRAY_MAP_TYPE); + ParquetSchemaConverter.convertToParquetMessageType(NESTED_ARRAY_MAP_TYPE); try (ParquetWriter writer = ExampleParquetWriter.builder( HadoopOutputFile.fromPath( diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java new file mode 100644 index 000000000000..bfbdaed7c4a3 --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet; + +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; + +import org.apache.parquet.schema.MessageType; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToPaimonRowType; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToParquetMessageType; +import static org.apache.paimon.types.DataTypesTest.assertThat; + +/** Test for {@link ParquetSchemaConverter}. */ +public class ParquetSchemaConverterTest { + + public static final RowType ALL_TYPES = + new RowType( + Arrays.asList( + new DataField(0, "string", DataTypes.STRING()), + new DataField(1, "stringNotNull", DataTypes.STRING().notNull()), + new DataField(2, "boolean", DataTypes.BOOLEAN()), + new DataField(3, "bytes", DataTypes.BYTES()), + new DataField(4, "decimal(9,2)", DataTypes.DECIMAL(9, 2)), + new DataField(5, "decimal(18,2)", DataTypes.DECIMAL(18, 2)), + new DataField(6, "decimal(27,2)", DataTypes.DECIMAL(27, 2)), + new DataField(7, "tinyint", DataTypes.TINYINT()), + new DataField(8, "smallint", DataTypes.SMALLINT()), + new DataField(9, "int", DataTypes.INT()), + new DataField(10, "bigint", DataTypes.BIGINT()), + new DataField(11, "float", DataTypes.FLOAT()), + new DataField(12, "double", DataTypes.DOUBLE()), + new DataField(13, "date", DataTypes.DATE()), + new DataField(14, "time", DataTypes.TIME()), + new DataField(15, "timestamp(3)", DataTypes.TIMESTAMP_MILLIS()), + new DataField(16, "timestamp", DataTypes.TIMESTAMP()), + new DataField(17, "timestampLtz(3)", DataTypes.TIMESTAMP_LTZ_MILLIS()), + new DataField( + 18, "timestampLtz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()), + new DataField(19, "array", new ArrayType(DataTypes.STRING())), + new DataField( + 20, "map", new MapType(DataTypes.STRING(), DataTypes.STRING())), + new DataField( + 21, + "row", + new RowType( + Arrays.asList( + new DataField( + 22, "f1", DataTypes.INT().notNull()), + new DataField(23, "f2", DataTypes.STRING())))), + new DataField( + 24, + "nested", + new RowType( + Arrays.asList( + new DataField( + 25, + "f1", + new MapType( + DataTypes.STRING(), + new ArrayType( + DataTypes.STRING()))), + new DataField( + 26, + "f2", + new RowType( + Arrays.asList( + new DataField( + 27, + "f1", + DataTypes + .INT() + .notNull()), + new DataField( + 28, + "f2", + DataTypes + .STRING()))) + .notNull())))))); + + @Test + public void testPaimonParquetSchemaConvert() { + MessageType messageType = convertToParquetMessageType(ALL_TYPES); + RowType rowType = convertToPaimonRowType(messageType); + assertThat(ALL_TYPES).isEqualTo(rowType); + } +}