From a0b16e643c07b38c2168c132c36ae4a5252d4109 Mon Sep 17 00:00:00 2001
From: gnehil
Date: Fri, 27 Dec 2024 09:54:42 +0800
Subject: [PATCH] [feature](load) new ingestion load (#45937)
### What problem does this PR solve?
Problem Summary:
Ingestion Load is used to load pre-processed data into doris.
Preprocessing refers to writing the result data to an external storage
system after the data is processed according to the partitioning,
bucketing and aggregation methods defined by the doris table.
The preprocessing is completed by the external system, and then the BE
reads the data and converts it into segment files and saves it.
The basic flow is as follows:

### Release note
[feature](load) new ingestion load
(cherry picked from commit 6580f6bfbcea53efaf0f21ea556f781ce65b98b7)
---
be/src/olap/push_handler.cpp | 53 +-
.../java/org/apache/doris/common/Config.java | 4 +
.../apache/doris/sparkdpp/EtlJobConfig.java | 6 +-
.../apache/doris/catalog/SparkResource.java | 1 +
.../apache/doris/httpv2/rest/LoadAction.java | 211 +++
.../doris/httpv2/rest/RestBaseController.java | 1 +
.../org/apache/doris/load/EtlJobType.java | 1 +
.../doris/load/loadv2/IngestionLoadJob.java | 1139 +++++++++++++++++
.../org/apache/doris/load/loadv2/LoadJob.java | 5 +
.../apache/doris/load/loadv2/LoadManager.java | 50 +-
.../doris/load/loadv2/SparkEtlJobHandler.java | 1 +
.../load/loadv2/SparkLauncherMonitor.java | 1 +
.../doris/load/loadv2/SparkLoadAppHandle.java | 1 +
.../doris/load/loadv2/SparkLoadJob.java | 1 +
.../load/loadv2/SparkLoadPendingTask.java | 8 +-
.../loadv2/SparkPendingTaskAttachment.java | 1 +
.../doris/load/loadv2/SparkRepository.java | 1 +
.../load/loadv2/SparkYarnConfigFiles.java | 1 +
.../org/apache/doris/master/MasterImpl.java | 7 +-
.../apache/doris/persist/gson/GsonUtils.java | 9 +-
.../load/loadv2/SparkLoadPendingTaskTest.java | 26 +-
.../MinimumCoverageRollupTreeBuilderTest.java | 8 +-
.../load/loadv2/etl/SparkEtlJobTest.java | 4 +-
.../data/load_p0/ingestion_load/data.parquet | Bin 0 -> 5745 bytes
.../data/load_p0/ingestion_load/data1.parquet | Bin 0 -> 4057 bytes
.../load_p0/ingestion_load/data2-0.parquet | Bin 0 -> 851 bytes
.../load_p0/ingestion_load/data2-1.parquet | Bin 0 -> 781 bytes
.../load_p0/ingestion_load/data2-2.parquet | Bin 0 -> 781 bytes
.../load_p0/ingestion_load/data2-3.parquet | Bin 0 -> 839 bytes
.../ingestion_load/test_ingestion_load.out | 37 +
.../test_ingestion_load_multi_table.out | 25 +
...est_ingestion_load_with_inverted_index.out | 13 +
.../test_ingestion_load_with_partition.out | 7 +
.../ingestion_load/test_ingestion_load.groovy | 222 ++++
.../test_ingestion_load_alter_column.groovy | 208 +++
...test_ingestion_load_alter_partition.groovy | 224 ++++
.../test_ingestion_load_drop_table.groovy | 196 +++
.../test_ingestion_load_multi_table.groovy | 208 +++
..._ingestion_load_with_inverted_index.groovy | 166 +++
.../test_ingestion_load_with_partition.groovy | 160 +++
40 files changed, 2967 insertions(+), 39 deletions(-)
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/loadv2/IngestionLoadJob.java
create mode 100644 regression-test/data/load_p0/ingestion_load/data.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/data1.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/data2-0.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/data2-1.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/data2-2.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/data2-3.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/test_ingestion_load.out
create mode 100644 regression-test/data/load_p0/ingestion_load/test_ingestion_load_multi_table.out
create mode 100644 regression-test/data/load_p0/ingestion_load/test_ingestion_load_with_inverted_index.out
create mode 100644 regression-test/data/load_p0/ingestion_load/test_ingestion_load_with_partition.out
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_alter_column.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_alter_partition.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_drop_table.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_multi_table.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_with_inverted_index.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_with_partition.groovy
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 575b002b2f6086..99568d47298aaa 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -34,15 +34,16 @@
#include
#include
#include
+#include
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
+#include "io/hdfs_builder.h"
#include "olap/delete_handler.h"
#include "olap/olap_define.h"
#include "olap/rowset/pending_rowset_helper.h"
-#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/schema.h"
@@ -53,10 +54,11 @@
#include "olap/txn_manager.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
-#include "util/runtime_profile.h"
#include "util/time.h"
#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/simple_function_factory.h"
@@ -352,8 +354,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange&
_file_params.expr_of_dest_slot = _params.expr_of_dest_slot;
_file_params.dest_sid_to_src_sid_without_trans = _params.dest_sid_to_src_sid_without_trans;
_file_params.strict_mode = _params.strict_mode;
- _file_params.__isset.broker_addresses = true;
- _file_params.broker_addresses = t_scan_range.broker_addresses;
+ if (_ranges[0].file_type == TFileType::FILE_HDFS) {
+ _file_params.hdfs_params = parse_properties(_params.properties);
+ } else {
+ _file_params.__isset.broker_addresses = true;
+ _file_params.broker_addresses = t_scan_range.broker_addresses;
+ }
for (const auto& range : _ranges) {
TFileRangeDesc file_range;
@@ -482,17 +488,36 @@ Status PushBrokerReader::_cast_to_input_block() {
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
// remove nullable here, let the get_function decide whether nullable
auto return_type = slot_desc->get_data_type_ptr();
- vectorized::ColumnsWithTypeAndName arguments {
- arg,
- {vectorized::DataTypeString().create_column_const(
- arg.column->size(), remove_nullable(return_type)->get_family_name()),
- std::make_shared(), ""}};
- auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
- "CAST", arguments, return_type);
idx = _src_block_name_to_idx[slot_desc->col_name()];
- RETURN_IF_ERROR(
- func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
- _src_block_ptr->get_by_position(idx).type = std::move(return_type);
+ // bitmap convert:src -> to_base64 -> bitmap_from_base64
+ if (slot_desc->type().is_bitmap_type()) {
+ auto base64_return_type = vectorized::DataTypeFactory::instance().create_data_type(
+ vectorized::DataTypeString().get_type_as_type_descriptor(),
+ slot_desc->is_nullable());
+ auto func_to_base64 = vectorized::SimpleFunctionFactory::instance().get_function(
+ "to_base64", {arg}, base64_return_type);
+ RETURN_IF_ERROR(func_to_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
+ arg.column->size()));
+ _src_block_ptr->get_by_position(idx).type = std::move(base64_return_type);
+ auto& arg_base64 = _src_block_ptr->get_by_name(slot_desc->col_name());
+ auto func_bitmap_from_base64 =
+ vectorized::SimpleFunctionFactory::instance().get_function(
+ "bitmap_from_base64", {arg_base64}, return_type);
+ RETURN_IF_ERROR(func_bitmap_from_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
+ arg_base64.column->size()));
+ _src_block_ptr->get_by_position(idx).type = std::move(return_type);
+ } else {
+ vectorized::ColumnsWithTypeAndName arguments {
+ arg,
+ {vectorized::DataTypeString().create_column_const(
+ arg.column->size(), remove_nullable(return_type)->get_family_name()),
+ std::make_shared(), ""}};
+ auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
+ "CAST", arguments, return_type);
+ RETURN_IF_ERROR(
+ func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
+ _src_block_ptr->get_by_position(idx).type = std::move(return_type);
+ }
}
return Status::OK();
}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 01d981efdd97b4..a270501560d743 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -633,6 +633,10 @@ public class Config extends ConfigBase {
@ConfField(description = {"Yarn 配置文件的路径", "Yarn config path"})
public static String yarn_config_dir = System.getenv("DORIS_HOME") + "/lib/yarn-config";
+ @ConfField(mutable = true, masterOnly = true, description = {"Ingestion load 的默认超时时间,单位是秒。",
+ "Default timeout for ingestion load job, in seconds."})
+ public static int ingestion_load_default_timeout_second = 86400; // 1 day
+
@ConfField(mutable = true, masterOnly = true, description = {"Sync job 的最大提交间隔,单位是秒。",
"Maximal intervals between two sync job's commits."})
public static long sync_commit_interval_second = 10;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java b/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java
index c59901d383b648..8d9d5de54b59f1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java
@@ -371,14 +371,17 @@ public static class EtlIndex implements Serializable {
public String indexType;
@SerializedName(value = "isBaseIndex")
public boolean isBaseIndex;
+ @SerializedName(value = "schemaVersion")
+ public int schemaVersion;
public EtlIndex(long indexId, List etlColumns, int schemaHash,
- String indexType, boolean isBaseIndex) {
+ String indexType, boolean isBaseIndex, int schemaVersion) {
this.indexId = indexId;
this.columns = etlColumns;
this.schemaHash = schemaHash;
this.indexType = indexType;
this.isBaseIndex = isBaseIndex;
+ this.schemaVersion = schemaVersion;
}
public EtlColumn getColumn(String name) {
@@ -398,6 +401,7 @@ public String toString() {
+ ", schemaHash=" + schemaHash
+ ", indexType='" + indexType + '\''
+ ", isBaseIndex=" + isBaseIndex
+ + ", schemaVersion=" + schemaVersion
+ '}';
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
index 704d8e512d7f3c..2af2a9b4a90df2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
@@ -70,6 +70,7 @@
*
* DROP RESOURCE "spark0";
*/
+@Deprecated
public class SparkResource extends Resource {
private static final Logger LOG = LogManager.getLogger(SparkResource.class);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 2f9efc1ed1b1bf..b62ac7832e7a88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -27,13 +27,21 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
+import org.apache.doris.httpv2.rest.manager.HttpUtils;
+import org.apache.doris.load.FailMsg;
import org.apache.doris.load.StreamLoadHandler;
+import org.apache.doris.load.loadv2.IngestionLoadJob;
+import org.apache.doris.load.loadv2.LoadJob;
+import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.GroupCommitPlanner;
@@ -45,9 +53,14 @@
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.transaction.BeginTransactionException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.base.Strings;
import io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -59,10 +72,14 @@
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
@@ -694,4 +711,198 @@ private Backend selectBackendForGroupCommit(String clusterName, HttpServletReque
}
return backend;
}
+
+ /**
+ * Request body example:
+ * {
+ * "label": "test",
+ * "tableToPartition": {
+ * "tbl_test_spark_load": ["p1","p2"]
+ * },
+ * "properties": {
+ * "strict_mode": "true",
+ * "timeout": 3600000
+ * }
+ * }
+ *
+ */
+ @RequestMapping(path = "/api/ingestion_load/{" + CATALOG_KEY + "}/{" + DB_KEY
+ + "}/_create", method = RequestMethod.POST)
+ public Object createIngestionLoad(HttpServletRequest request, HttpServletResponse response,
+ @PathVariable(value = CATALOG_KEY) String catalog,
+ @PathVariable(value = DB_KEY) String db) {
+ if (needRedirect(request.getScheme())) {
+ return redirectToHttps(request);
+ }
+
+ executeCheckPassword(request, response);
+
+ if (!InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalog)) {
+ return ResponseEntityBuilder.okWithCommonError("Only support internal catalog. "
+ + "Current catalog is " + catalog);
+ }
+
+ Object redirectView = redirectToMaster(request, response);
+ if (redirectView != null) {
+ return redirectView;
+ }
+
+ String fullDbName = getFullDbName(db);
+
+ Map resultMap = new HashMap<>();
+
+ try {
+
+ String body = HttpUtils.getBody(request);
+ JsonMapper mapper = JsonMapper.builder().build();
+ JsonNode jsonNode = mapper.reader().readTree(body);
+
+ String label = jsonNode.get("label").asText();
+ Map> tableToPartition = mapper.reader()
+ .readValue(jsonNode.get("tableToPartition").traverse(),
+ new TypeReference
+ * Load data file which has been pre-processed
+ *
+ * There are 4 steps in IngestionLoadJob:
+ * Step1: Outside system execute ingestion etl job.
+ * Step2: LoadEtlChecker will check ingestion etl job status periodically
+ * and send push tasks to be when ingestion etl job is finished.
+ * Step3: LoadLoadingChecker will check loading status periodically and commit transaction when push tasks are finished.
+ * Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction.
+ */
+public class IngestionLoadJob extends LoadJob {
+
+ public static final Logger LOG = LogManager.getLogger(IngestionLoadJob.class);
+
+ @Setter
+ @SerializedName("ests")
+ private EtlStatus etlStatus;
+
+ // members below updated when job state changed to loading
+ // { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, etlFileSize) }
+ @SerializedName(value = "tm2fi")
+ private final Map> tabletMetaToFileInfo = Maps.newHashMap();
+
+ @SerializedName(value = "hp")
+ private final Map hadoopProperties = new HashMap<>();
+
+ @SerializedName(value = "i2sv")
+ private final Map indexToSchemaVersion = new HashMap<>();
+
+ private final Map indexToSchemaHash = Maps.newHashMap();
+
+ private final Map filePathToSize = new HashMap<>();
+
+ private final Set finishedReplicas = Sets.newHashSet();
+ private final Set quorumTablets = Sets.newHashSet();
+ private final Set fullTablets = Sets.newHashSet();
+
+ private final List commitInfos = Lists.newArrayList();
+
+ private final Map> tableToLoadPartitions = Maps.newHashMap();
+
+ private final Map> tabletToSentReplicaPushTask = Maps.newHashMap();
+
+ private long etlStartTimestamp = -1;
+
+ private long quorumFinishTimestamp = -1;
+
+ private List loadTableIds = new ArrayList<>();
+
+ public IngestionLoadJob() {
+ super(EtlJobType.INGESTION);
+ }
+
+ public IngestionLoadJob(long dbId, String label, List tableNames, UserIdentity userInfo)
+ throws LoadException {
+ super(EtlJobType.INGESTION, dbId, label);
+ this.loadTableIds = getLoadTableIds(tableNames);
+ this.userInfo = userInfo;
+ }
+
+ @Override
+ public Set getTableNamesForShow() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set getTableNames() throws MetaNotFoundException {
+ Set result = Sets.newHashSet();
+ Database database = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
+ for (long tableId : loadTableIds) {
+ Table table = database.getTableOrMetaException(tableId);
+ result.add(table.getName());
+ }
+ return result;
+ }
+
+ @Override
+ public void afterVisible(TransactionState txnState, boolean txnOperated) {
+ super.afterVisible(txnState, txnOperated);
+ clearJob();
+ }
+
+ @Override
+ public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason)
+ throws UserException {
+ super.afterAborted(txnState, txnOperated, txnStatusChangeReason);
+ clearJob();
+ }
+
+ @Override
+ public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean needLog) {
+ super.cancelJobWithoutCheck(failMsg, abortTxn, needLog);
+ clearJob();
+ }
+
+ @Override
+ public void cancelJob(FailMsg failMsg) throws DdlException {
+ super.cancelJob(failMsg);
+ clearJob();
+ }
+
+ private List getLoadTableIds(List tableNames) throws LoadException {
+ Database db = Env.getCurrentInternalCatalog()
+ .getDbOrException(dbId, s -> new LoadException("db does not exist. id: " + s));
+ List list = new ArrayList<>(tableNames.size());
+ for (String tableName : tableNames) {
+ OlapTable olapTable = (OlapTable) db.getTableOrException(tableName,
+ s -> new LoadException("table does not exist. id: " + s));
+ list.add(olapTable.getId());
+ }
+ return list;
+ }
+
+ @Override
+ protected long getEtlStartTimestamp() {
+ return etlStartTimestamp;
+ }
+
+ public long beginTransaction()
+ throws BeginTransactionException, MetaNotFoundException, AnalysisException, QuotaExceedException,
+ LabelAlreadyUsedException, DuplicatedRequestException {
+ this.transactionId = Env.getCurrentGlobalTransactionMgr()
+ .beginTransaction(dbId, loadTableIds, label, null,
+ new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(), ExecuteEnv.getInstance().getStartupTime()),
+ TransactionState.LoadJobSourceType.FRONTEND, id, getTimeout());
+ return transactionId;
+ }
+
+ public Map getLoadMeta(Map> tableToPartitionMap)
+ throws LoadException {
+
+ if (tableToPartitionMap == null || tableToPartitionMap.isEmpty()) {
+ throw new IllegalArgumentException("tableToPartitionMap is empty");
+ }
+
+ Database db = Env.getCurrentInternalCatalog()
+ .getDbOrException(dbId, s -> new LoadException("db does not exist. id: " + s));
+ Map loadMeta = new HashMap<>();
+ loadMeta.put("dbId", db.getId());
+ Long signature = Env.getCurrentEnv().getNextId();
+ loadMeta.put("signature", signature);
+
+ List tables;
+ try {
+ tables = db.getTablesOnIdOrderOrThrowException(loadTableIds);
+ } catch (MetaNotFoundException e) {
+ throw new LoadException(e.getMessage());
+ }
+
+ MetaLockUtils.readLockTables(tables);
+ try {
+ Map> tableMeta = new HashMap<>(tableToPartitionMap.size());
+ for (Map.Entry> entry : tableToPartitionMap.entrySet()) {
+ String tableName = entry.getKey();
+ Map meta = tableMeta.getOrDefault(tableName, new HashMap<>());
+ OlapTable olapTable = (OlapTable) db.getTableOrException(tableName,
+ s -> new LoadException("table does not exist. id: " + s));
+ meta.put("id", olapTable.getId());
+ List indices = createEtlIndexes(olapTable);
+ meta.put("indexes", indices);
+ List partitionNames = entry.getValue();
+ Set partitionIds;
+ if (partitionNames != null && !partitionNames.isEmpty()) {
+ partitionIds = new HashSet<>(partitionNames.size());
+ for (String partitionName : partitionNames) {
+ Partition partition = olapTable.getPartition(partitionName);
+ if (partition == null) {
+ throw new LoadException(String.format("partition %s is not exists", partitionName));
+ }
+ partitionIds.add(partition.getId());
+ }
+ } else {
+ partitionIds =
+ olapTable.getAllPartitions().stream().map(Partition::getId).collect(Collectors.toSet());
+ }
+ EtlJobConfig.EtlPartitionInfo etlPartitionInfo = createEtlPartitionInfo(olapTable, partitionIds);
+ meta.put("partitionInfo", etlPartitionInfo);
+ tableMeta.put(tableName, meta);
+
+ if (tableToLoadPartitions.containsKey(olapTable.getId())) {
+ tableToLoadPartitions.get(olapTable.getId()).addAll(partitionIds);
+ } else {
+ tableToLoadPartitions.put(olapTable.getId(), partitionIds);
+ }
+
+ }
+ loadMeta.put("tableMeta", tableMeta);
+ } finally {
+ MetaLockUtils.readUnlockTables(tables);
+ }
+ return loadMeta;
+
+ }
+
+ private List createEtlIndexes(OlapTable table) throws LoadException {
+ List etlIndexes = Lists.newArrayList();
+
+ for (Map.Entry> entry : table.getIndexIdToSchema().entrySet()) {
+ long indexId = entry.getKey();
+ // todo(liheng): get schema hash and version from materialized index meta directly
+ MaterializedIndexMeta indexMeta = table.getIndexMetaByIndexId(indexId);
+ int schemaHash = indexMeta.getSchemaHash();
+ int schemaVersion = indexMeta.getSchemaVersion();
+
+ boolean changeAggType = table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS)
+ && table.getTableProperty().getEnableUniqueKeyMergeOnWrite();
+
+ // columns
+ List etlColumns = Lists.newArrayList();
+ for (Column column : entry.getValue()) {
+ etlColumns.add(createEtlColumn(column, changeAggType));
+ }
+
+ // check distribution type
+ DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
+ if (distributionInfo.getType() != DistributionInfo.DistributionInfoType.HASH) {
+ // RANDOM not supported
+ String errMsg = "Unsupported distribution type. type: " + distributionInfo.getType().name();
+ LOG.warn(errMsg);
+ throw new LoadException(errMsg);
+ }
+
+ // index type
+ String indexType;
+ KeysType keysType = table.getKeysTypeByIndexId(indexId);
+ switch (keysType) {
+ case DUP_KEYS:
+ indexType = "DUPLICATE";
+ break;
+ case AGG_KEYS:
+ indexType = "AGGREGATE";
+ break;
+ case UNIQUE_KEYS:
+ indexType = "UNIQUE";
+ break;
+ default:
+ String errMsg = "unknown keys type. type: " + keysType.name();
+ LOG.warn(errMsg);
+ throw new LoadException(errMsg);
+ }
+
+ indexToSchemaVersion.put(indexId, schemaVersion);
+
+ etlIndexes.add(new EtlJobConfig.EtlIndex(indexId, etlColumns, schemaHash, indexType,
+ indexId == table.getBaseIndexId(), schemaVersion));
+ }
+
+ return etlIndexes;
+ }
+
+ private EtlJobConfig.EtlColumn createEtlColumn(Column column, boolean changeAggType) {
+ // column name
+ String name = column.getName().toLowerCase(Locale.ROOT);
+ // column type
+ PrimitiveType type = column.getDataType();
+ String columnType = column.getDataType().toString();
+ // is allow null
+ boolean isAllowNull = column.isAllowNull();
+ // is key
+ boolean isKey = column.isKey();
+
+ // aggregation type
+ String aggregationType = null;
+ if (column.getAggregationType() != null) {
+ if (changeAggType && !column.isKey()) {
+ aggregationType = AggregateType.REPLACE.toSql();
+ } else {
+ aggregationType = column.getAggregationType().toString();
+ }
+ }
+
+ // default value
+ String defaultValue = null;
+ if (column.getDefaultValue() != null) {
+ defaultValue = column.getDefaultValue();
+ }
+ if (column.isAllowNull() && column.getDefaultValue() == null) {
+ defaultValue = "\\N";
+ }
+
+ // string length
+ int stringLength = 0;
+ if (type.isStringType()) {
+ stringLength = column.getStrLen();
+ }
+
+ // decimal precision scale
+ int precision = 0;
+ int scale = 0;
+ if (type.isDecimalV2Type() || type.isDecimalV3Type()) {
+ precision = column.getPrecision();
+ scale = column.getScale();
+ }
+
+ return new EtlJobConfig.EtlColumn(name, columnType, isAllowNull, isKey, aggregationType, defaultValue,
+ stringLength, precision, scale);
+ }
+
+ private EtlJobConfig.EtlPartitionInfo createEtlPartitionInfo(OlapTable table, Set partitionIds)
+ throws LoadException {
+ PartitionType type = table.getPartitionInfo().getType();
+
+ List partitionColumnRefs = Lists.newArrayList();
+ List etlPartitions = Lists.newArrayList();
+ if (type == PartitionType.RANGE) {
+ RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo();
+ for (Column column : rangePartitionInfo.getPartitionColumns()) {
+ partitionColumnRefs.add(column.getName());
+ }
+
+ for (Map.Entry entry : rangePartitionInfo.getAllPartitionItemEntryList(true)) {
+ long partitionId = entry.getKey();
+ if (!partitionIds.contains(partitionId)) {
+ continue;
+ }
+
+ Partition partition = table.getPartition(partitionId);
+ if (partition == null) {
+ throw new LoadException("partition does not exist. id: " + partitionId);
+ }
+
+ // bucket num
+ int bucketNum = partition.getDistributionInfo().getBucketNum();
+
+ // is max partition
+ Range range = entry.getValue().getItems();
+ boolean isMaxPartition = range.upperEndpoint().isMaxValue();
+
+ // start keys
+ List rangeKeyExprs = range.lowerEndpoint().getKeys();
+ List