Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,

const rapidjson::Value& str_col = is_nested_str ? col[0] : col;

RETURN_ERROR_IF_COL_IS_ARRAY(col, type);

const std::string& val = str_col.GetString();
size_t val_size = str_col.GetStringLength();
StringParser::ParseResult result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,20 @@ public static ObjectNode getMapping(String indexMapping) {
}

@VisibleForTesting
public static ObjectNode getRootSchema(ObjectNode mappings, String mappingType) {
public static ObjectNode getRootSchema(ObjectNode mappings, String mappingType, List<String> arrayFields) {
// Type is null in the following three cases
// 1. Equal 6.8.x and after
// 2. Multi-catalog auto infer
// 3. Equal 6.8.x and before user not passed
if (mappingType == null) {
// remove dynamic templates, for ES 7.x and 8.x
checkNonPropertiesFields(mappings);
checkNonPropertiesFields(mappings, arrayFields);
String firstType = mappings.fieldNames().next();
if (!"properties".equals(firstType)) {
// If type is not passed in takes the first type.
ObjectNode firstData = (ObjectNode) mappings.get(firstType);
// check for ES 6.x and before
checkNonPropertiesFields(firstData);
checkNonPropertiesFields(firstData, arrayFields);
return firstData;
}
// Equal 7.x and after
Expand All @@ -123,11 +123,11 @@ public static ObjectNode getRootSchema(ObjectNode mappings, String mappingType)
if (mappings.has(mappingType)) {
ObjectNode jsonData = (ObjectNode) mappings.get(mappingType);
// check for ES 6.x and before
checkNonPropertiesFields(jsonData);
checkNonPropertiesFields(jsonData, arrayFields);
return jsonData;
}
// Compatible type error
return getRootSchema(mappings, null);
return getRootSchema(mappings, null, arrayFields);
}
}

Expand All @@ -136,9 +136,21 @@ public static ObjectNode getRootSchema(ObjectNode mappings, String mappingType)
*
* @param mappings
*/
private static void checkNonPropertiesFields(ObjectNode mappings) {
// remove `_meta` field
mappings.remove("_meta");
private static void checkNonPropertiesFields(ObjectNode mappings, List<String> arrayFields) {
// remove `_meta` field and parse array_fields
JsonNode metaNode = mappings.remove("_meta");
if (metaNode != null) {
JsonNode dorisMeta = metaNode.get("doris");
if (dorisMeta != null) {
JsonNode arrayNode = dorisMeta.get("array_fields");
if (arrayNode != null) {
Iterator<JsonNode> iterator = arrayNode.iterator();
while (iterator.hasNext()) {
arrayFields.add(iterator.next().asText());
}
}
}
}
// remove `dynamic_templates` field
mappings.remove("dynamic_templates");
// check explicit mapping
Expand All @@ -152,7 +164,7 @@ private static void checkNonPropertiesFields(ObjectNode mappings) {
**/
public static ObjectNode getMappingProps(String sourceIndex, String indexMapping, String mappingType) {
ObjectNode mappings = getMapping(indexMapping);
ObjectNode rootSchema = getRootSchema(mappings, mappingType);
ObjectNode rootSchema = getRootSchema(mappings, mappingType, new ArrayList<>());
ObjectNode properties = (ObjectNode) rootSchema.get("properties");
if (properties == null) {
throw new DorisEsException(
Expand All @@ -169,13 +181,15 @@ public static List<Column> genColumnsFromEs(EsRestClient client, String indexNam
boolean mappingEsId) {
String mapping = client.getMapping(indexName);
ObjectNode mappings = getMapping(mapping);
ObjectNode rootSchema = getRootSchema(mappings, mappingType);
return genColumnsFromEs(indexName, mappingType, rootSchema, mappingEsId);
// Get array_fields while removing _meta property.
List<String> arrayFields = new ArrayList<>();
ObjectNode rootSchema = getRootSchema(mappings, mappingType, arrayFields);
return genColumnsFromEs(indexName, mappingType, rootSchema, mappingEsId, arrayFields);
}

@VisibleForTesting
public static List<Column> genColumnsFromEs(String indexName, String mappingType, ObjectNode rootSchema,
boolean mappingEsId) {
boolean mappingEsId, List<String> arrayFields) {
List<Column> columns = new ArrayList<>();
if (mappingEsId) {
Column column = new Column();
Expand All @@ -191,14 +205,6 @@ public static List<Column> genColumnsFromEs(String indexName, String mappingType
throw new DorisEsException(
"index[" + indexName + "] type[" + mappingType + "] mapping not found for the ES Cluster");
}
List<String> arrayFields = new ArrayList<>();
JsonNode meta = mappingProps.get("_meta");
if (meta != null) {
JsonNode dorisMeta = meta.get("doris");
if (dorisMeta != null) {
arrayFields = dorisMeta.findValuesAsText("array_fields");
}
}
Iterator<String> iterator = mappingProps.fieldNames();
while (iterator.hasNext()) {
String fieldName = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ public void testEs8Mapping() throws IOException, URISyntaxException {
@Test
public void testDateType() throws IOException, URISyntaxException {
ObjectNode testDateFormat = EsUtil.getRootSchema(
EsUtil.getMapping(loadJsonFromFile("data/es/test_date_format.json")), null);
List<Column> parseColumns = EsUtil.genColumnsFromEs("test_date_format", null, testDateFormat, false);
EsUtil.getMapping(loadJsonFromFile("data/es/test_date_format.json")), null, new ArrayList<>());
List<Column> parseColumns = EsUtil.genColumnsFromEs("test_date_format", null, testDateFormat, false, new ArrayList<>());
Assertions.assertEquals(8, parseColumns.size());
for (Column column : parseColumns) {
String name = column.getName();
Expand Down