Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ public static <T extends JsonNode> T getNodeAs(
fieldName, clazz.getName(), node.getClass().getName()));
}

public static <T> T fromJson(String json, TypeReference<T> typeReference) {
try {
return OBJECT_MAPPER_INSTANCE.readValue(json, typeReference);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static <T> T fromJson(String json, Class<T> clazz) {
try {
return OBJECT_MAPPER_INSTANCE.reader().readValue(json, clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class HiveSchema {
private static final Logger LOG = LoggerFactory.getLogger(HiveSchema.class);
private final RowType rowType;

private HiveSchema(RowType rowType) {
HiveSchema(RowType rowType) {
this.rowType = rowType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
package org.apache.paimon.hive;

import org.apache.paimon.hive.objectinspector.PaimonInternalRowObjectInspector;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
Expand All @@ -32,6 +36,7 @@

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Properties;

Expand All @@ -53,11 +58,20 @@ public class PaimonSerDe extends AbstractSerDe {
@Override
public void initialize(@Nullable Configuration configuration, Properties properties)
throws SerDeException {
HiveSchema schema = HiveSchema.extract(configuration, properties);
this.tableSchema = schema;
String dataFieldStr = properties.getProperty(PaimonStorageHandler.PAIMON_TABLE_FIELDS);
if (dataFieldStr != null) {
List<DataField> dataFields =
JsonSerdeUtil.fromJson(dataFieldStr, new TypeReference<List<DataField>>() {});
this.tableSchema = new HiveSchema(new RowType(dataFields));
} else {
this.tableSchema = HiveSchema.extract(configuration, properties);
}

inspector =
new PaimonInternalRowObjectInspector(
schema.fieldNames(), schema.fieldTypes(), schema.fieldComments());
tableSchema.fieldNames(),
tableSchema.fieldTypes(),
tableSchema.fieldComments());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.hive.mapred.PaimonInputFormat;
import org.apache.paimon.hive.mapred.PaimonOutputCommitter;
import org.apache.paimon.hive.mapred.PaimonOutputFormat;
import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
Expand All @@ -46,6 +47,8 @@ public class PaimonStorageHandler implements HiveStoragePredicateHandler, HiveSt
private static final String MAPRED_OUTPUT_COMMITTER = "mapred.output.committer.class";
private static final String PAIMON_WRITE = "paimon.write";

public static final String PAIMON_TABLE_FIELDS = "paimon.table.fields";

private Configuration conf;

@Override
Expand Down Expand Up @@ -76,9 +79,15 @@ public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> map) {
Properties properties = tableDesc.getProperties();
map.put(
LocationKeyExtractor.INTERNAL_LOCATION,
LocationKeyExtractor.getPaimonLocation(conf, properties));
String paimonLocation = LocationKeyExtractor.getPaimonLocation(conf, properties);
map.put(LocationKeyExtractor.INTERNAL_LOCATION, paimonLocation);
String dataFieldJsonStr = getDataFieldsJsonStr(properties);
tableDesc.getProperties().put(PAIMON_TABLE_FIELDS, dataFieldJsonStr);
}

static String getDataFieldsJsonStr(Properties properties) {
HiveSchema hiveSchema = HiveSchema.extract(null, properties);
return JsonSerdeUtil.toJson(hiveSchema.fields());
}

public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> map) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -342,4 +347,22 @@ private Properties createTableWithExistsDDL() {
properties.setProperty("location", tempDir.toString());
return properties;
}

@Test
public void testReadHiveSchemaFromProperties() throws Exception {
createSchema();
// cache the TableSchema to properties
Properties properties = new Properties();
properties.put(hive_metastoreConstants.META_TABLE_LOCATION, tempDir.toString());

HiveSchema hiveSchema = HiveSchema.extract(null, properties);

List<DataField> dataFields = hiveSchema.fields();
String dataFieldStr = JsonSerdeUtil.toJson(dataFields);

List<DataField> dataFieldsDeserialized =
JsonSerdeUtil.fromJson(dataFieldStr, new TypeReference<List<DataField>>() {});
HiveSchema newHiveSchema = new HiveSchema(new RowType(dataFieldsDeserialized));
assertThat(newHiveSchema).usingRecursiveComparison().isEqualTo(hiveSchema);
}
}