diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index c63886d5ea8684..ceba0b4cc3f021 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -584,7 +584,7 @@ class RuntimeState { #define RETURN_IF_CANCELLED(state) \ do { \ - if (UNLIKELY((state)->is_cancelled())) return Status(TStatusCode::CANCELLED); \ + if (UNLIKELY((state)->is_cancelled())) return Status::CANCELLED; \ } while (false) } diff --git a/docs/user_guide/sql_reference.md b/docs/user_guide/sql_reference.md index 8577b0076fa008..175e0c1887a912 100644 --- a/docs/user_guide/sql_reference.md +++ b/docs/user_guide/sql_reference.md @@ -1223,7 +1223,7 @@ load_label是当前导入批次的标签,由用户指定,需要保证在一 - file_path,broker中的文件路径,可以指定到一个文件,也可以用/*通配符指定某个目录下的所有文件。 -- NEGATIVE:如果指定此参数,则相当于导入一批“负”数据。用于抵消之前导入的同一批数据。该参数仅适用于存在value列,并且value列的聚合类型为SUM的情况。 +- NEGATIVE:如果指定此参数,则相当于导入一批“负”数据。用于抵消之前导入的同一批数据。该参数仅适用于存在value列,并且value列的聚合类型为SUM的情况。不支持Broker方式导入 - PARTITION:如果指定此参数,则只会导入指定的分区,导入分区以外的数据会被过滤掉。如果不指定,默认导入table的所有分区。 @@ -1291,9 +1291,9 @@ load_label是当前导入批次的标签,由用户指定,需要保证在一 - max_filter_ratio:最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。 -- load_delete_flag:指定该导入是否通过导入key列的方式删除数据,仅适用于UNIQUE KEY,导入时可不指定value列。默认为false - +- load_delete_flag:指定该导入是否通过导入key列的方式删除数据,仅适用于UNIQUE KEY,导入时可不指定value列。默认为false (不支持Broker方式导入) +- exe_mem_limit:在Broker Load方式时生效,指定导入执行时,后端可使用的最大内存。 举例: diff --git a/fe/src/com/baidu/palo/analysis/LoadStmt.java b/fe/src/com/baidu/palo/analysis/LoadStmt.java index 2e5781a92d803f..8f1e3556bb3355 100644 --- a/fe/src/com/baidu/palo/analysis/LoadStmt.java +++ b/fe/src/com/baidu/palo/analysis/LoadStmt.java @@ -62,6 +62,7 @@ public class LoadStmt extends DdlStmt { public static final String TIMEOUT_PROPERTY = "timeout"; public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio"; public static final String LOAD_DELETE_FLAG_PROPERTY = "load_delete_flag"; + public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; public static final String CLUSTER_PROPERTY = "cluster"; // for load data from Baidu Object Store(BOS) @@ -127,6 +128,7 @@ public static void checkProperties(Map properties) throws DdlExc propertySet.add(LoadStmt.TIMEOUT_PROPERTY); propertySet.add(LoadStmt.MAX_FILTER_RATIO_PROPERTY); propertySet.add(LoadStmt.LOAD_DELETE_FLAG_PROPERTY); + propertySet.add(LoadStmt.EXEC_MEM_LIMIT); propertySet.add(LoadStmt.CLUSTER_PROPERTY); for (Entry entry : properties.entrySet()) { diff --git a/fe/src/com/baidu/palo/common/FeConstants.java b/fe/src/com/baidu/palo/common/FeConstants.java index 1f36e71137ab01..a1bc669776c7c4 100644 --- a/fe/src/com/baidu/palo/common/FeConstants.java +++ b/fe/src/com/baidu/palo/common/FeConstants.java @@ -38,5 +38,5 @@ public class FeConstants { // general model // Current meta data version. Use this version to write journals and image - public static int meta_version = FeMetaVersion.VERSION_33; + public static int meta_version = FeMetaVersion.VERSION_34; } diff --git a/fe/src/com/baidu/palo/common/FeMetaVersion.java b/fe/src/com/baidu/palo/common/FeMetaVersion.java index c52603475849d3..19379133b859a0 100644 --- a/fe/src/com/baidu/palo/common/FeMetaVersion.java +++ b/fe/src/com/baidu/palo/common/FeMetaVersion.java @@ -65,4 +65,7 @@ public final class FeMetaVersion { // persist decommission type public static final int VERSION_33 = 33; + + // persist LoadJob's execMemLimit + public static final int VERSION_34 = 34; } diff --git a/fe/src/com/baidu/palo/load/Load.java b/fe/src/com/baidu/palo/load/Load.java index 21a01f6100817f..ce606943330b9e 100644 --- a/fe/src/com/baidu/palo/load/Load.java +++ b/fe/src/com/baidu/palo/load/Load.java @@ -428,9 +428,11 @@ private LoadJob createLoadJob(LoadStmt stmt, EtlJobType etlJobType, job.setDbId(db.getId()); job.setTimestamp(timestamp); job.setBrokerDesc(stmt.getBrokerDesc()); + // resource info if (ConnectContext.get() != null) { job.setResourceInfo(ConnectContext.get().toResourceCtx()); + job.setExecMemLimit(ConnectContext.get().getSessionVariable().getMaxExecMemByte()); } // job properties @@ -459,6 +461,14 @@ private LoadJob createLoadJob(LoadStmt stmt, EtlJobType etlJobType, throw new DdlException("Value of delete flag is invalid"); } } + + if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) { + try { + job.setExecMemLimit(Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT))); + } catch (NumberFormatException e) { + throw new DdlException("Execute memory limit is not Long", e); + } + } } // job table load info diff --git a/fe/src/com/baidu/palo/load/LoadJob.java b/fe/src/com/baidu/palo/load/LoadJob.java index 665bbb31a36955..da5dafd68136da 100644 --- a/fe/src/com/baidu/palo/load/LoadJob.java +++ b/fe/src/com/baidu/palo/load/LoadJob.java @@ -62,7 +62,8 @@ public enum EtlJobType { private static final int DEFAULT_TIMEOUT_S = 0; private static final double DEFAULT_MAX_FILTER_RATIO = 0; - + private static final long DEFAULT_EXEC_MEM_LIMIT = 2147483648L; // 2GB + private long id; private long dbId; private String label; @@ -103,6 +104,8 @@ public enum EtlJobType { private TPriority priority; + private long execMemLimit; + public LoadJob() { this(""); } @@ -137,6 +140,7 @@ public LoadJob(String label, int timeoutSecond, double maxFilterRatio) { this.replicaPersistInfos = Maps.newHashMap(); this.resourceInfo = null; this.priority = TPriority.NORMAL; + this.execMemLimit = DEFAULT_EXEC_MEM_LIMIT; } public long getId() { @@ -275,6 +279,10 @@ public PullLoadSourceInfo getPullLoadSourceInfo() { return pullLoadSourceInfo; } + public void setExecMemLimit(long execMemLimit) { this.execMemLimit = execMemLimit; } + + public long getExecMemLimit() { return execMemLimit; } + public void setEtlJobType(EtlJobType etlJobType) { this.etlJobType = etlJobType; switch (etlJobType) { @@ -605,6 +613,8 @@ public void write(DataOutput out) throws IOException { out.writeBoolean(true); pullLoadSourceInfo.write(out); } + + out.writeLong(execMemLimit); } public void readFields(DataInput in) throws IOException { @@ -713,6 +723,10 @@ public void readFields(DataInput in) throws IOException { this.pullLoadSourceInfo = PullLoadSourceInfo.read(in); } } + + if (version >= FeMetaVersion.VERSION_34) { + this.execMemLimit = in.readLong(); + } } @Override diff --git a/fe/src/com/baidu/palo/qe/Coordinator.java b/fe/src/com/baidu/palo/qe/Coordinator.java index 08dd031d272315..9c41e10508830a 100644 --- a/fe/src/com/baidu/palo/qe/Coordinator.java +++ b/fe/src/com/baidu/palo/qe/Coordinator.java @@ -224,6 +224,14 @@ public String getTrackingUrl() { return trackingUrl; } + public void setExecMemoryLimit(long execMemoryLimit) { + this.queryOptions.setMem_limit(execMemoryLimit); + } + + public void setTimeout(int timeout) { + this.queryOptions.setQuery_timeout(timeout); + } + // Initiate private void prepare() { for (PlanFragment fragment : fragments) { diff --git a/fe/src/com/baidu/palo/task/LoadEtlTask.java b/fe/src/com/baidu/palo/task/LoadEtlTask.java index 1dbd8b3ba5c59d..10a4a4f312c60a 100644 --- a/fe/src/com/baidu/palo/task/LoadEtlTask.java +++ b/fe/src/com/baidu/palo/task/LoadEtlTask.java @@ -1,18 +1,18 @@ -// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package com.baidu.palo.task; import com.baidu.palo.catalog.Catalog; @@ -64,6 +64,10 @@ public LoadEtlTask(LoadJob job) { this.load = Catalog.getInstance().getLoadInstance(); } + protected String getErrorMsg() { + return "etl job fail"; + } + @Override protected void exec() { // check job state @@ -117,7 +121,7 @@ private void updateEtlStatus() throws LoadException { processEtlFinished(); break; case CANCELLED: - throw new LoadException("etl job fail"); + throw new LoadException(getErrorMsg()); case RUNNING: processEtlRunning(); break; diff --git a/fe/src/com/baidu/palo/task/PullLoadEtlTask.java b/fe/src/com/baidu/palo/task/PullLoadEtlTask.java index 1c78b5d1c858b1..7948c3b3a0a770 100644 --- a/fe/src/com/baidu/palo/task/PullLoadEtlTask.java +++ b/fe/src/com/baidu/palo/task/PullLoadEtlTask.java @@ -35,6 +35,21 @@ public PullLoadEtlTask(LoadJob job) { mgr = Catalog.getInstance().getPullLoadJobMgr(); } + @Override + protected String getErrorMsg() { + String errMsg = null; + PullLoadJob pullLoadJob = mgr.getJob(job.getId()); + if (pullLoadJob != null) { + PullLoadTask failureTask = pullLoadJob.getFailureTask(); + if (failureTask != null) { + if (failureTask.getExecuteStatus() != null) { + errMsg = "Broker etl failed: " + failureTask.getExecuteStatus().getErrorMsg(); + } + } + } + return errMsg != null ? errMsg : super.getErrorMsg(); + } + @Override protected boolean updateJobEtlStatus() { PullLoadJob pullLoadJob = mgr.getJob(job.getId()); diff --git a/fe/src/com/baidu/palo/task/PullLoadJob.java b/fe/src/com/baidu/palo/task/PullLoadJob.java index eaeceecbc168fb..9ff77d3f51347c 100644 --- a/fe/src/com/baidu/palo/task/PullLoadJob.java +++ b/fe/src/com/baidu/palo/task/PullLoadJob.java @@ -77,6 +77,10 @@ public synchronized void cancel() { } } + public PullLoadTask getFailureTask() { + return failureTask; + } + public synchronized void onTaskFinished(PullLoadTask task) { int taskId = task.taskId; if (!state.isRunning()) { diff --git a/fe/src/com/baidu/palo/task/PullLoadPendingTask.java b/fe/src/com/baidu/palo/task/PullLoadPendingTask.java index f639217fa6bdae..708bba097e5129 100644 --- a/fe/src/com/baidu/palo/task/PullLoadPendingTask.java +++ b/fe/src/com/baidu/palo/task/PullLoadPendingTask.java @@ -62,7 +62,7 @@ protected void createEtlRequest() throws Exception { // Generate pull load task, one PullLoadTask task = new PullLoadTask( job.getId(), nextTaskId, db, table, - job.getBrokerDesc(), entry.getValue(), jobDeadlineMs); + job.getBrokerDesc(), entry.getValue(), jobDeadlineMs, job.getExecMemLimit()); task.init(); pullLoadTaskList.add(task); nextTaskId++; diff --git a/fe/src/com/baidu/palo/task/PullLoadTask.java b/fe/src/com/baidu/palo/task/PullLoadTask.java index 7af46df86c6850..e018bbb14eb96d 100644 --- a/fe/src/com/baidu/palo/task/PullLoadTask.java +++ b/fe/src/com/baidu/palo/task/PullLoadTask.java @@ -22,6 +22,7 @@ import com.baidu.palo.common.InternalException; import com.baidu.palo.common.Status; import com.baidu.palo.load.BrokerFileGroup; +import com.baidu.palo.load.LoadJob; import com.baidu.palo.qe.Coordinator; import com.baidu.palo.qe.QeProcessor; import com.baidu.palo.thrift.TQueryType; @@ -55,6 +56,7 @@ public class PullLoadTask { private Map fileMap; private String trackingUrl; private Map counters; + private final long execMemLimit; // Runtime variables private enum State { @@ -74,7 +76,7 @@ public PullLoadTask( long jobId, int taskId, Database db, OlapTable table, BrokerDesc brokerDesc, List fileGroups, - long jobDeadlineMs) { + long jobDeadlineMs, long execMemLimit) { this.jobId = jobId; this.taskId = taskId; this.db = db; @@ -82,6 +84,7 @@ public PullLoadTask( this.brokerDesc = brokerDesc; this.fileGroups = fileGroups; this.jobDeadlineMs = jobDeadlineMs; + this.execMemLimit = execMemLimit; } public void init() throws InternalException { @@ -117,7 +120,7 @@ public synchronized boolean isFinished() { } public Status getExecuteStatus() { - return null; + return executeStatus; } public synchronized void onCancelled() { @@ -201,6 +204,7 @@ public void executeOnce() throws InternalException { curCoordinator = new Coordinator(executeId, planner.getDescTable(), planner.getFragments(), planner.getScanNodes()); curCoordinator.setQueryType(TQueryType.LOAD); + curCoordinator.setExecMemoryLimit(execMemLimit); } boolean needUnregister = false;