From 60abb3afa489f49de3c989168947fda30d17bbed Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 4 Sep 2017 16:47:52 +0800 Subject: [PATCH 1/5] modify modify the way of PartitionDesc analysis. fix set global variables bug --- .../palo/alter/DecommissionBackendJob.java | 2 +- .../com/baidu/palo/alter/SchemaChangeJob.java | 3 +- fe/src/com/baidu/palo/analysis/Analyzer.java | 5 +++ .../baidu/palo/analysis/CreateTableStmt.java | 12 ++++++- .../palo/analysis/ExprSubstitutionMap.java | 4 +-- .../baidu/palo/analysis/PartitionDesc.java | 6 +++- .../palo/analysis/RangePartitionDesc.java | 32 +++++++++++-------- fe/src/com/baidu/palo/analysis/SetStmt.java | 2 ++ .../palo/analysis/ShowVariablesStmt.java | 2 +- .../com/baidu/palo/catalog/InfoSchemaDb.java | 11 +++++++ fe/src/com/baidu/palo/qe/VariableMgr.java | 3 ++ 11 files changed, 61 insertions(+), 21 deletions(-) diff --git a/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java b/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java index 5c582951ef46fd..ddad005e96bc2d 100644 --- a/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java +++ b/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java @@ -485,7 +485,7 @@ public synchronized int tryFinishJob() { } } } else { - // Shrinking capacity in cluser + // Shrinking capacity in cluster if (decommissionType == DecommissionType.ClusterDecommission) { for (String clusterName : clusterBackendsMap.keySet()) { final Map idToBackend = clusterBackendsMap.get(clusterName); diff --git a/fe/src/com/baidu/palo/alter/SchemaChangeJob.java b/fe/src/com/baidu/palo/alter/SchemaChangeJob.java index 9bb652aa01fcb0..254aea3303ddcd 100644 --- a/fe/src/com/baidu/palo/alter/SchemaChangeJob.java +++ b/fe/src/com/baidu/palo/alter/SchemaChangeJob.java @@ -829,10 +829,9 @@ public void unprotectedReplayCancel(Database db) { } for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { - if (replica.getState() == ReplicaState.CLONE) { + if (replica.getState() == ReplicaState.CLONE || replica.getState() == ReplicaState.NORMAL) { continue; } - Preconditions.checkState(replica.getState() == ReplicaState.SCHEMA_CHANGE, replica.getState()); replica.setState(ReplicaState.NORMAL); } // end for replicas } // end for tablets diff --git a/fe/src/com/baidu/palo/analysis/Analyzer.java b/fe/src/com/baidu/palo/analysis/Analyzer.java index 227632d73425d5..379a2e86650582 100644 --- a/fe/src/com/baidu/palo/analysis/Analyzer.java +++ b/fe/src/com/baidu/palo/analysis/Analyzer.java @@ -24,6 +24,7 @@ import com.baidu.palo.catalog.Catalog; import com.baidu.palo.catalog.Column; import com.baidu.palo.catalog.Database; +import com.baidu.palo.catalog.InfoSchemaDb; import com.baidu.palo.catalog.Table; import com.baidu.palo.cluster.ClusterNamespace; import com.baidu.palo.catalog.Type; @@ -526,6 +527,10 @@ public SlotDescriptor registerColumnRef(TableName tblName, String colName) throw if (tblName == null) { d = resolveColumnRef(colName); } else { + if (InfoSchemaDb.isInfoSchemaDb(tblName.getDb()) || + (tblName.getDb() == null && InfoSchemaDb.isInfoSchemaDb(getDefaultDb()))) { + tblName = new TableName(tblName.getDb(), tblName.getTbl().toLowerCase()); + } d = resolveColumnRef(tblName, colName); } if (d == null && hasAncestors() && isSubquery) { diff --git a/fe/src/com/baidu/palo/analysis/CreateTableStmt.java b/fe/src/com/baidu/palo/analysis/CreateTableStmt.java index cb7f90d9b93713..d5a15fc15bac7d 100644 --- a/fe/src/com/baidu/palo/analysis/CreateTableStmt.java +++ b/fe/src/com/baidu/palo/analysis/CreateTableStmt.java @@ -25,6 +25,7 @@ import com.baidu.palo.catalog.Catalog; import com.baidu.palo.catalog.Column; import com.baidu.palo.catalog.KeysType; +import com.baidu.palo.catalog.PartitionType; import com.baidu.palo.common.AnalysisException; import com.baidu.palo.common.ErrorCode; import com.baidu.palo.common.ErrorReport; @@ -279,7 +280,16 @@ public void analyze(Analyzer analyzer) throws AnalysisException, InternalExcepti if (engineName.equals("olap")) { // analyze partition if (partitionDesc != null) { - partitionDesc.analyze(columnSet, properties); + if (partitionDesc.getType() != PartitionType.RANGE) { + throw new AnalysisException("Currently only support range partition with engine type olap"); + } + + RangePartitionDesc rangePartitionDesc = (RangePartitionDesc) partitionDesc; + if (rangePartitionDesc.getPartitionColNames().size() != 1) { + throw new AnalysisException("Only allow partitioned by one column"); + } + + rangePartitionDesc.analyze(columns, properties); } // analyze distribution diff --git a/fe/src/com/baidu/palo/analysis/ExprSubstitutionMap.java b/fe/src/com/baidu/palo/analysis/ExprSubstitutionMap.java index 1a5c6c131a9359..f74f2f7d94b606 100644 --- a/fe/src/com/baidu/palo/analysis/ExprSubstitutionMap.java +++ b/fe/src/com/baidu/palo/analysis/ExprSubstitutionMap.java @@ -49,9 +49,9 @@ public ExprSubstitutionMap() { } // Only used to convert show statement to select statement - public ExprSubstitutionMap(boolean check_analyzed) { + public ExprSubstitutionMap(boolean checkAnalyzed) { this(Lists.newArrayList(), Lists.newArrayList()); - this.checkAnalyzed_ = false; + this.checkAnalyzed_ = checkAnalyzed; } public ExprSubstitutionMap(List lhs, List rhs) { diff --git a/fe/src/com/baidu/palo/analysis/PartitionDesc.java b/fe/src/com/baidu/palo/analysis/PartitionDesc.java index 05d3524a4e4a68..f9846481934e9a 100644 --- a/fe/src/com/baidu/palo/analysis/PartitionDesc.java +++ b/fe/src/com/baidu/palo/analysis/PartitionDesc.java @@ -44,10 +44,14 @@ public class PartitionDesc implements Writable { public PartitionDesc() { } - public void analyze(Set colSet, Map otherProperties) throws AnalysisException { + public void analyze(List cols, Map otherProperties) throws AnalysisException { throw new NotImplementedException(); } + public PartitionType getType() { + return type; + } + public String toSql() { throw new NotImplementedException(); } diff --git a/fe/src/com/baidu/palo/analysis/RangePartitionDesc.java b/fe/src/com/baidu/palo/analysis/RangePartitionDesc.java index 951f17852d6496..6b13c075ab5b3b 100644 --- a/fe/src/com/baidu/palo/analysis/RangePartitionDesc.java +++ b/fe/src/com/baidu/palo/analysis/RangePartitionDesc.java @@ -70,29 +70,36 @@ public List getPartitionColNames() { } @Override - public void analyze(Set cols, Map otherProperties) throws AnalysisException { + public void analyze(List cols, Map otherProperties) throws AnalysisException { if (partitionColNames == null || partitionColNames.isEmpty()) { throw new AnalysisException("No partition columns."); } - if (partitionColNames.size() != 1) { - throw new AnalysisException("Only allow partitioned by one column"); - } - + Set partColNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); for (String partitionCol : partitionColNames) { - // use this to infer user which columns not exist - if (!cols.contains(partitionCol)) { - throw new AnalysisException("Partition column[" + partitionCol + "] does not exist."); + if (!partColNames.add(partitionCol)) { + throw new AnalysisException("Duplicated partition column " + partitionCol); + } + + boolean found = false; + for (Column col : cols) { + if (col.getName().equals(partitionCol)) { + if (!col.isKey()) { + throw new AnalysisException("Only key column can be partition column"); + } + found = true; + break; + } } - - if (partitionCol.equals(PrimitiveType.HLL.toString())) { - throw new AnalysisException("Partition column[" + partitionCol + "] can't be HLL."); + + if (!found) { + throw new AnalysisException("Partition column[" + partitionCol + "] does not exist in column list."); } } Set nameSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); for (SingleRangePartitionDesc desc : singleRangePartitionDescs) { - if (nameSet.contains(desc.getPartitionName())) { + if (!nameSet.add(desc.getPartitionName())) { throw new AnalysisException("Duplicated partition name: " + desc.getPartitionName()); } // in create table stmt, we use given properties @@ -102,7 +109,6 @@ public void analyze(Set cols, Map otherProperties) throw givenProperties = Maps.newHashMap(otherProperties); } desc.analyze(cols.size(), givenProperties); - nameSet.add(desc.getPartitionName()); } } diff --git a/fe/src/com/baidu/palo/analysis/SetStmt.java b/fe/src/com/baidu/palo/analysis/SetStmt.java index adbf1a4e42aefe..6b416e15b8e515 100644 --- a/fe/src/com/baidu/palo/analysis/SetStmt.java +++ b/fe/src/com/baidu/palo/analysis/SetStmt.java @@ -75,6 +75,8 @@ public RedirectStatus getRedirectStatus() { for (SetVar var : setVars) { if (var instanceof SetPassVar) { return RedirectStatus.FORWARD_WITH_SYNC; + } else if (var.getType() == SetType.GLOBAL) { + return RedirectStatus.FORWARD_WITH_SYNC; } } } diff --git a/fe/src/com/baidu/palo/analysis/ShowVariablesStmt.java b/fe/src/com/baidu/palo/analysis/ShowVariablesStmt.java index 5ecef8fea9bbef..47a0f2af102300 100644 --- a/fe/src/com/baidu/palo/analysis/ShowVariablesStmt.java +++ b/fe/src/com/baidu/palo/analysis/ShowVariablesStmt.java @@ -78,7 +78,7 @@ public SelectStmt toSelectStmt(Analyzer analyzer) { analyze(analyzer); // Columns SelectList selectList = new SelectList(); - ExprSubstitutionMap aliasMap = new ExprSubstitutionMap(); + ExprSubstitutionMap aliasMap = new ExprSubstitutionMap(false); TableName tableName = null; if (type == SetType.GLOBAL) { tableName = new TableName(InfoSchemaDb.DATABASE_NAME, "GLOBAL_VARIABLES"); diff --git a/fe/src/com/baidu/palo/catalog/InfoSchemaDb.java b/fe/src/com/baidu/palo/catalog/InfoSchemaDb.java index c1afb28108a7d5..18e3db34da19d7 100644 --- a/fe/src/com/baidu/palo/catalog/InfoSchemaDb.java +++ b/fe/src/com/baidu/palo/catalog/InfoSchemaDb.java @@ -86,4 +86,15 @@ public Table getTable(String name) { public static String getFullInfoSchemaDbName(String cluster) { return ClusterNamespace.getFullName(cluster, DATABASE_NAME); } + + public static boolean isInfoSchemaDb(String dbName) { + if (dbName == null) { + return false; + } + String[] ele = dbName.split(ClusterNamespace.CLUSTER_DELIMITER); + if (ele.length == 2) { + dbName = ele[1]; + } + return DATABASE_NAME.equalsIgnoreCase(dbName); + } } diff --git a/fe/src/com/baidu/palo/qe/VariableMgr.java b/fe/src/com/baidu/palo/qe/VariableMgr.java index b3c2fc1f4e372e..d57bd123ee6da5 100644 --- a/fe/src/com/baidu/palo/qe/VariableMgr.java +++ b/fe/src/com/baidu/palo/qe/VariableMgr.java @@ -334,6 +334,9 @@ public static void replayGlobalVariable(SessionVariable variable) throws IOExcep ctx = ctxByVarName.get(SessionVariable.MAX_ALLOWED_PACKET); setValue(ctx.getObj(), ctx.getField(), String.valueOf(variable.getMaxAllowedPacket())); + ctx = ctxByVarName.get(SessionVariable.EXEC_MEM_LIMIT); + setValue(ctx.getObj(), ctx.getField(), String.valueOf(variable.getMaxExecMemByte())); + } finally { wlock.unlock(); } From 78669b490ffc18fa8b38e35000cc601d22447e6c Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 4 Sep 2017 17:11:36 +0800 Subject: [PATCH 2/5] add export job statitic proc --- fe/src/com/baidu/palo/catalog/Catalog.java | 34 ++++++++++++------- .../baidu/palo/common/proc/JobsProcDir.java | 14 +++++--- .../baidu/palo/http/action/IndexAction.java | 5 ++- .../baidu/palo/http/action/SessionAction.java | 2 +- fe/src/com/baidu/palo/load/ExportMgr.java | 19 ++++++++--- 5 files changed, 50 insertions(+), 24 deletions(-) diff --git a/fe/src/com/baidu/palo/catalog/Catalog.java b/fe/src/com/baidu/palo/catalog/Catalog.java index 0ddcc37ebd3b26..fcb12e883fba9b 100644 --- a/fe/src/com/baidu/palo/catalog/Catalog.java +++ b/fe/src/com/baidu/palo/catalog/Catalog.java @@ -122,7 +122,6 @@ import com.baidu.palo.persist.DropLinkDbAndUpdateDbInfo; import com.baidu.palo.persist.DropPartitionInfo; import com.baidu.palo.persist.EditLog; -import com.baidu.palo.persist.LinkDbInfo; import com.baidu.palo.persist.ModifyPartitionInfo; import com.baidu.palo.persist.PartitionPersistInfo; import com.baidu.palo.persist.RecoverInfo; @@ -162,6 +161,7 @@ import com.sleepycat.je.rep.InsufficientLogException; import com.sleepycat.je.rep.NetworkRestore; import com.sleepycat.je.rep.NetworkRestoreConfig; + import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.client.CreateTableOptions; @@ -535,17 +535,27 @@ private void getClusterIdAndRole() throws IOException { storage = new Storage(IMAGE_DIR); clusterId = storage.getClusterID(); } - } else { // Designate one helper node. Get the roll and version info - // from the helper node - role = getFeNodeType(); - if (role == FrontendNodeType.REPLICA) { - // for compatibility - role = FrontendNodeType.FOLLOWER; - } - Storage storage = new Storage(IMAGE_DIR); - if (role == FrontendNodeType.UNKNOWN) { - LOG.error("current node is not added to the group. please add it first."); - System.exit(-1); + } else { + // Designate one helper node. Get the roll and version info + // from the helper node + Storage storage = null; + // try to get role from helper node, + // this loop will not end until we get centain role type + while(true) { + role = getFeNodeType(); + storage = new Storage(IMAGE_DIR); + if (role == FrontendNodeType.UNKNOWN) { + LOG.error("current node is not added to the group. please add it first."); + // System.exit(-1); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + System.exit(-1); + } + } else { + break; + } } if (roleFile.exists() && role != storage.getRole() || !roleFile.exists()) { storage.writeFrontendRole(role); diff --git a/fe/src/com/baidu/palo/common/proc/JobsProcDir.java b/fe/src/com/baidu/palo/common/proc/JobsProcDir.java index bb3a84c5c89552..f8922baf184875 100644 --- a/fe/src/com/baidu/palo/common/proc/JobsProcDir.java +++ b/fe/src/com/baidu/palo/common/proc/JobsProcDir.java @@ -29,6 +29,8 @@ import com.baidu.palo.catalog.Database; import com.baidu.palo.clone.CloneJob.JobState; import com.baidu.palo.common.AnalysisException; +import com.baidu.palo.load.ExportJob; +import com.baidu.palo.load.ExportMgr; import com.baidu.palo.load.Load; import com.google.common.base.Preconditions; @@ -179,10 +181,14 @@ public ProcResult fetchResult() throws AnalysisException { finishedNum.toString(), cancelledNum.toString(), totalNum.toString())); // export - // TODO(lingbin): add abstract info later - result.addRow(Lists.newArrayList( - EXPORT, "pendingNum", "runningNum", - "finishedNum", "cancelledNum", "totalNum")); + ExportMgr exportMgr = Catalog.getInstance().getExportMgr(); + pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING, dbId); + runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING, dbId); + finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED, dbId); + cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED, dbId); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); return result; } diff --git a/fe/src/com/baidu/palo/http/action/IndexAction.java b/fe/src/com/baidu/palo/http/action/IndexAction.java index 1723da8f572aca..5a5d50e15bd9d2 100755 --- a/fe/src/com/baidu/palo/http/action/IndexAction.java +++ b/fe/src/com/baidu/palo/http/action/IndexAction.java @@ -41,13 +41,12 @@ public void executeGet(BaseRequest request, BaseResponse response) { getPageFooter(response.getContent()); writeResponse(request, response); } - private void appendVersionInfo(StringBuilder buffer) { buffer.append("

Version

"); - buffer.append("
version info");
+        buffer.append("
version info
"); buffer.append("Version: " + Version.PALO_BUILD_VERSION + "
"); - buffer.append("Svn: " + Version.PALO_BUILD_HASH + "
"); + buffer.append("Git: " + Version.PALO_BUILD_HASH + "
"); buffer.append("Build Info: " + Version.PALO_BUILD_INFO + "
"); buffer.append("Build Time: " + Version.PALO_BUILD_TIME + "
"); buffer.append("
"); diff --git a/fe/src/com/baidu/palo/http/action/SessionAction.java b/fe/src/com/baidu/palo/http/action/SessionAction.java index 291d22c714a6a3..425de5ef2b0929 100755 --- a/fe/src/com/baidu/palo/http/action/SessionAction.java +++ b/fe/src/com/baidu/palo/http/action/SessionAction.java @@ -72,7 +72,7 @@ private void appendSessionInfo(StringBuilder buffer) { buffer.append("

This page lists the session info, there are " + rowSet.size() - + "active sessions.

"); + + " active sessions.

"); appendTableHeader(buffer, SESSION_TABLE_HEADER); appendTableBody(buffer, rowSet); diff --git a/fe/src/com/baidu/palo/load/ExportMgr.java b/fe/src/com/baidu/palo/load/ExportMgr.java index f912dae406dffa..5e79925aeb603b 100644 --- a/fe/src/com/baidu/palo/load/ExportMgr.java +++ b/fe/src/com/baidu/palo/load/ExportMgr.java @@ -18,16 +18,12 @@ import com.baidu.palo.analysis.BrokerDesc; import com.baidu.palo.analysis.ExportStmt; import com.baidu.palo.catalog.Catalog; -import com.baidu.palo.catalog.Database; -import com.baidu.palo.catalog.Table; import com.baidu.palo.common.Config; import com.baidu.palo.common.util.ListComparator; import com.baidu.palo.common.util.OrderByPair; import com.baidu.palo.common.util.TimeUtils; import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -242,4 +238,19 @@ public void replayUpdateJobState(long jobId, ExportJob.JobState newState) { writeUnlock(); } } + + public Integer getJobNum(ExportJob.JobState state, long dbId) { + int size = 0; + readLock(); + try { + for (ExportJob job : idToJob.values()) { + if (job.getState() == state && job.getDbId() == dbId) { + ++size; + } + } + } finally { + readUnlock(); + } + return size; + } } From 9f4fb6208ecd8dd9eceef3c66cf7e72d1f3bc741 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 4 Sep 2017 17:30:02 +0800 Subject: [PATCH 3/5] merge from baidu/master --- fe/src/com/baidu/palo/PaloFe.java | 2 +- .../palo/alter/DecommissionBackendJob.java | 23 +- fe/src/com/baidu/palo/analysis/Analyzer.java | 20 +- .../com/baidu/palo/analysis/DateLiteral.java | 4 + fe/src/com/baidu/palo/catalog/Catalog.java | 39 +- fe/src/com/baidu/palo/catalog/Column.java | 2 +- fe/src/com/baidu/palo/catalog/ScalarType.java | 14 + .../com/baidu/palo/catalog/StructField.java | 14 + fe/src/com/baidu/palo/catalog/Type.java | 18 + fe/src/com/baidu/palo/clone/CloneChecker.java | 5 +- fe/src/com/baidu/palo/cluster/Cluster.java | 1 + fe/src/com/baidu/palo/common/Config.java | 9 + fe/src/com/baidu/palo/common/ErrorCode.java | 2 +- .../com/baidu/palo/common/FeNameFormat.java | 2 + fe/src/com/baidu/palo/http/BaseAction.java | 3 +- fe/src/com/baidu/palo/http/HttpServer.java | 11 + .../http/rest/AddApacheHdfsBrokerAction.java | 91 ++ .../palo/http/rest/AddBackendAction.java | 84 ++ .../palo/http/rest/AddFrontendAction.java | 107 ++ .../palo/http/rest/BootstrapFinishAction.java | 69 ++ .../palo/http/rest/GetLoadInfoAction.java | 7 +- .../com/baidu/palo/http/rest/MultiAbort.java | 2 + .../com/baidu/palo/http/rest/MultiCommit.java | 4 +- .../com/baidu/palo/http/rest/MultiDesc.java | 8 +- .../com/baidu/palo/http/rest/MultiList.java | 8 +- .../com/baidu/palo/http/rest/MultiStart.java | 8 +- .../com/baidu/palo/http/rest/MultiUnload.java | 1 + fe/src/com/baidu/palo/load/DppScheduler.java | 1044 ++++++++--------- fe/src/com/baidu/palo/load/Load.java | 41 +- .../baidu/palo/planner/SingleNodePlanner.java | 2 +- .../baidu/palo/service/FrontendOptions.java | 2 - .../baidu/palo/system/SystemInfoService.java | 6 +- 32 files changed, 1054 insertions(+), 599 deletions(-) create mode 100644 fe/src/com/baidu/palo/http/rest/AddApacheHdfsBrokerAction.java create mode 100644 fe/src/com/baidu/palo/http/rest/AddBackendAction.java create mode 100644 fe/src/com/baidu/palo/http/rest/AddFrontendAction.java create mode 100644 fe/src/com/baidu/palo/http/rest/BootstrapFinishAction.java diff --git a/fe/src/com/baidu/palo/PaloFe.java b/fe/src/com/baidu/palo/PaloFe.java index cef5846cb66ecd..88824205843907 100644 --- a/fe/src/com/baidu/palo/PaloFe.java +++ b/fe/src/com/baidu/palo/PaloFe.java @@ -52,7 +52,7 @@ public static void main(String[] args) { } // pid file - if (!createAndLockPidFile(paloHome + "/bin/fe.pid")) { + if (!createAndLockPidFile(System.getenv("PID_DIR") + "/fe.pid")) { throw new IOException("pid file is already locked."); } diff --git a/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java b/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java index ddad005e96bc2d..304277164e0fe2 100644 --- a/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java +++ b/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java @@ -571,6 +571,17 @@ public void readFields(DataInput in) throws IOException { } clusterBackendsMap.put(cluster, backends); } + + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_33) { + String str = Text.readString(in); + // this is only for rectify misspellings... + if (str.equals("SystemDecomission")) { + str = "SystemDecommission"; + } else if (str.equals("ClusterDecomission")) { + str = "ClusterDecommission"; + } + decommissionType = DecommissionType.valueOf(str); + } } else { int backendNum = in.readInt(); Map backends = Maps.newHashMap(); @@ -582,17 +593,6 @@ public void readFields(DataInput in) throws IOException { } clusterBackendsMap.put(SystemInfoService.DEFAULT_CLUSTER, backends); } - - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_33) { - String str = Text.readString(in); - // this is only for rectify misspellings... - if (str.equals("SystemDecomission")) { - str = "SystemDecommission"; - } else if (str.equals("ClusterDecomission")) { - str = "ClusterDecommission"; - } - decommissionType = DecommissionType.valueOf(str); - } } @Override @@ -608,6 +608,7 @@ public void write(DataOutput out) throws IOException { out.writeLong(id); } } + Text.writeString(out, decommissionType.toString()); } diff --git a/fe/src/com/baidu/palo/analysis/Analyzer.java b/fe/src/com/baidu/palo/analysis/Analyzer.java index 379a2e86650582..3d2730183907fe 100644 --- a/fe/src/com/baidu/palo/analysis/Analyzer.java +++ b/fe/src/com/baidu/palo/analysis/Analyzer.java @@ -733,15 +733,15 @@ private void registerConjunct(Expr e) { e.setId(globalState.conjunctIdGenerator.getNextId()); globalState.conjuncts.put(e.getId(), e); + // LOG.info("registered conjunct " + p.getId().toString() + ": " + p.toSql()); - ArrayList tupleIds = Lists.newArrayList(); ArrayList slotIds = Lists.newArrayList(); e.getIds(tupleIds, slotIds); // register full join conjuncts registerFullOuterJoinedConjunct(e); - + // update tuplePredicates for (TupleId id : tupleIds) { if (!tuplePredicates.containsKey(id)) { @@ -860,10 +860,10 @@ public List getUnassignedConjuncts( public List getAllUnassignedConjuncts(List tupleIds) { List result = Lists.newArrayList(); for (Expr e : globalState.conjuncts.values()) { - if (!e.isAuxExpr() - && e.isBoundByTupleIds(tupleIds) - && !globalState.assignedConjuncts.contains(e.getId()) - && !globalState.ojClauseByConjunct.containsKey(e.getId())) { + if (!e.isAuxExpr() + && e.isBoundByTupleIds(tupleIds) + && !globalState.assignedConjuncts.contains(e.getId()) + && !globalState.ojClauseByConjunct.containsKey(e.getId())) { result.add(e); } } @@ -1270,8 +1270,10 @@ public Type castAllToCompatibleType(List exprs) throws AnalysisException { // TODO(zc) compatibleType = Type.getCmpType(compatibleType, exprs.get(i).getType()); } - if (compatibleType == Type.VARCHAR && exprs.get(0).getType().isDateType()) { - compatibleType = Type.DATETIME; + if (compatibleType == Type.VARCHAR) { + if (exprs.get(0).getType().isDateType()) { + compatibleType = Type.DATETIME; + } } // Add implicit casts if necessary. for (int i = 0; i < exprs.size(); ++i) { @@ -1417,7 +1419,7 @@ public boolean canEvalPredicate(List tupleIds, Expr e) { } if (e.isOnClauseConjunct()) { - + if (isAntiJoinedConjunct(e)) return canEvalAntiJoinedConjunct(e, tupleIds); if (isIjConjunct(e) || isSjConjunct(e)) { if (!containsOuterJoinedTid(tids)) return true; diff --git a/fe/src/com/baidu/palo/analysis/DateLiteral.java b/fe/src/com/baidu/palo/analysis/DateLiteral.java index c21066c00c6735..630a1e66c17eed 100644 --- a/fe/src/com/baidu/palo/analysis/DateLiteral.java +++ b/fe/src/com/baidu/palo/analysis/DateLiteral.java @@ -30,6 +30,9 @@ import com.google.common.base.Preconditions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -37,6 +40,7 @@ import java.util.Date; public class DateLiteral extends LiteralExpr { + private static final Logger LOG = LogManager.getLogger(DateLiteral.class); private Date date; private DateLiteral() { diff --git a/fe/src/com/baidu/palo/catalog/Catalog.java b/fe/src/com/baidu/palo/catalog/Catalog.java index fcb12e883fba9b..2e21e247f8546d 100644 --- a/fe/src/com/baidu/palo/catalog/Catalog.java +++ b/fe/src/com/baidu/palo/catalog/Catalog.java @@ -543,6 +543,10 @@ private void getClusterIdAndRole() throws IOException { // this loop will not end until we get centain role type while(true) { role = getFeNodeType(); + if (role == FrontendNodeType.REPLICA) { + // for compatibility + role = FrontendNodeType.FOLLOWER; + } storage = new Storage(IMAGE_DIR); if (role == FrontendNodeType.UNKNOWN) { LOG.error("current node is not added to the group. please add it first."); @@ -557,6 +561,7 @@ private void getClusterIdAndRole() throws IOException { break; } } + if (roleFile.exists() && role != storage.getRole() || !roleFile.exists()) { storage.writeFrontendRole(role); } @@ -733,14 +738,18 @@ private void transferToMaster() throws IOException { // catalog recycle bin getRecycleBin().start(); - this.masterIp = FrontendOptions.getLocalHostAddress(); + if (!Config.master_ip.equals("0.0.0.0")) { + this.masterIp = Config.master_ip; + } else { + this.masterIp = FrontendOptions.getLocalHostAddress(); + } this.masterRpcPort = Config.rpc_port; this.masterHttpPort = Config.http_port; MasterInfo info = new MasterInfo(); - info.setIp(FrontendOptions.getLocalHostAddress()); - info.setRpcPort(Config.rpc_port); - info.setHttpPort(Config.http_port); + info.setIp(masterIp); + info.setRpcPort(masterRpcPort); + info.setHttpPort(masterHttpPort); editLog.logMasterInfo(info); createTimePrinter(); @@ -749,7 +758,7 @@ private void transferToMaster() throws IOException { timePrinter.setInterval(tsInterval); timePrinter.start(); - if (isDefaultClusterCreated) { + if (!isDefaultClusterCreated) { initDefaultCluster(); } } @@ -1637,8 +1646,7 @@ private void setCanRead(boolean hasLog, boolean err) { LOG.warn("meta out of date. current time: {}, synchronized time: {}, has log: {}, fe type: {}", currentTimeMs, synchronizedTimeMs, hasLog, feType); if (hasLog || (!hasLog && feType == FrontendNodeType.UNKNOWN)) { - // 1. if we read log from BDB, which means master is still - // alive. + // 1. if we read log from BDB, which means master is still alive. // So we need to set meta out of date. // 2. if we didn't read any log from BDB and feType is UNKNOWN, // which means this non-master node is disconnected with master. @@ -1646,6 +1654,14 @@ private void setCanRead(boolean hasLog, boolean err) { metaReplayState.setOutOfDate(currentTimeMs, synchronizedTimeMs); canRead = false; } + + // sleep 5s to avoid numerous 'meta out of date' log + try { + Thread.sleep(5000L); + } catch (InterruptedException e) { + LOG.error("unhandled exception when sleep", e); + } + } else { canRead = true; } @@ -1930,10 +1946,9 @@ public void dropDb(DropDbStmt stmt) throws DdlException { } // 2. drop tables in db - Database db = fullNameToDb.get(dbName); + Database db = this.fullNameToDb.get(dbName); db.writeLock(); try { - if (db.getDbState() == DbState.LINK && dbName.equals(db.getAttachDb())) { final DropLinkDbAndUpdateDbInfo info = new DropLinkDbAndUpdateDbInfo(); fullNameToDb.remove(db.getAttachDb()); @@ -4444,9 +4459,9 @@ private void unprotectCreateCluster(Cluster cluster) { nameToCluster.put(cluster.getName(), cluster); // create info schema db - final InfoSchemaDb db = new InfoSchemaDb(cluster.getName()); - db.setClusterName(cluster.getName()); - unprotectCreateDb(db); + final InfoSchemaDb infoDb = new InfoSchemaDb(cluster.getName()); + infoDb.setClusterName(cluster.getName()); + unprotectCreateDb(infoDb); // only need to create default cluster once. if (cluster.getName().equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)) { diff --git a/fe/src/com/baidu/palo/catalog/Column.java b/fe/src/com/baidu/palo/catalog/Column.java index 98992d05581d14..eada849b2de450 100644 --- a/fe/src/com/baidu/palo/catalog/Column.java +++ b/fe/src/com/baidu/palo/catalog/Column.java @@ -216,7 +216,7 @@ public void analyze(boolean isOlap) throws AnalysisException { if (columnType.getType() == PrimitiveType.FLOAT || columnType.getType() == PrimitiveType.DOUBLE) { if (isOlap && isKey) { - throw new AnalysisException("Float or double can't be used as a key, use decimal instead."); + throw new AnalysisException("Float or double can not used as a key, use decimal instead."); } } diff --git a/fe/src/com/baidu/palo/catalog/ScalarType.java b/fe/src/com/baidu/palo/catalog/ScalarType.java index 8fce0a220dfddc..1265bd0bc6643d 100644 --- a/fe/src/com/baidu/palo/catalog/ScalarType.java +++ b/fe/src/com/baidu/palo/catalog/ScalarType.java @@ -20,6 +20,20 @@ package com.baidu.palo.catalog; +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + import com.baidu.palo.thrift.TScalarType; import com.baidu.palo.thrift.TTypeDesc; import com.baidu.palo.thrift.TTypeNode; diff --git a/fe/src/com/baidu/palo/catalog/StructField.java b/fe/src/com/baidu/palo/catalog/StructField.java index 5d300da3058bd2..c8eadc00c58fc3 100644 --- a/fe/src/com/baidu/palo/catalog/StructField.java +++ b/fe/src/com/baidu/palo/catalog/StructField.java @@ -20,6 +20,20 @@ package com.baidu.palo.catalog; +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + import com.baidu.palo.thrift.TColumnType; import com.baidu.palo.thrift.TStructField; import com.baidu.palo.thrift.TTypeDesc; diff --git a/fe/src/com/baidu/palo/catalog/Type.java b/fe/src/com/baidu/palo/catalog/Type.java index 51fb7ba603a5c8..1b1804198094a7 100644 --- a/fe/src/com/baidu/palo/catalog/Type.java +++ b/fe/src/com/baidu/palo/catalog/Type.java @@ -20,6 +20,20 @@ package com.baidu.palo.catalog; +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + import java.io.StringReader; import java.util.ArrayList; import java.util.List; @@ -228,6 +242,10 @@ public boolean isDateType() { return isScalarType(PrimitiveType.DATE) || isScalarType(PrimitiveType.DATETIME); } + public boolean isDatetime() { + return isScalarType(PrimitiveType.DATETIME); + } + public boolean isComplexType() { return isStructType() || isCollectionType(); } diff --git a/fe/src/com/baidu/palo/clone/CloneChecker.java b/fe/src/com/baidu/palo/clone/CloneChecker.java index 5742fb20b16024..f3d78182dccb69 100644 --- a/fe/src/com/baidu/palo/clone/CloneChecker.java +++ b/fe/src/com/baidu/palo/clone/CloneChecker.java @@ -31,7 +31,6 @@ import com.baidu.palo.catalog.Catalog; import com.baidu.palo.catalog.DataProperty; import com.baidu.palo.catalog.Database; -import com.baidu.palo.catalog.Database.DbState; import com.baidu.palo.catalog.MaterializedIndex; import com.baidu.palo.catalog.MaterializedIndex.IndexState; import com.baidu.palo.catalog.OlapTable; @@ -43,6 +42,7 @@ import com.baidu.palo.catalog.Table; import com.baidu.palo.catalog.Table.TableType; import com.baidu.palo.catalog.Tablet; +import com.baidu.palo.catalog.Database.DbState; import com.baidu.palo.clone.CloneJob.JobPriority; import com.baidu.palo.clone.CloneJob.JobState; import com.baidu.palo.clone.CloneJob.JobType; @@ -260,7 +260,7 @@ private void checkTablets() { LOG.warn("failed to init backend infos of cluster: {}", clusterName); continue; } - + final Map>> clusterCapacityLevelToBackendIds = initBackendCapacityInfos(backendInfosInCluster); if (clusterCapacityLevelToBackendIds == null || clusterCapacityLevelToBackendIds.isEmpty()) { @@ -269,7 +269,6 @@ private void checkTablets() { } long dbId = db.getId(); - Set tableNames = db.getTableNamesWithLock(); boolean hasMigrations = false; // check table by table diff --git a/fe/src/com/baidu/palo/cluster/Cluster.java b/fe/src/com/baidu/palo/cluster/Cluster.java index 92daecf226f076..eaa64db7c9e7c9 100644 --- a/fe/src/com/baidu/palo/cluster/Cluster.java +++ b/fe/src/com/baidu/palo/cluster/Cluster.java @@ -48,6 +48,7 @@ */ public class Cluster implements Writable { private static final Logger LOG = LogManager.getLogger(Cluster.class); + private Long id; private String name; // backend which cluster own diff --git a/fe/src/com/baidu/palo/common/Config.java b/fe/src/com/baidu/palo/common/Config.java index 9897e8f38417ef..8d03e3891f6bf2 100644 --- a/fe/src/com/baidu/palo/common/Config.java +++ b/fe/src/com/baidu/palo/common/Config.java @@ -15,6 +15,8 @@ package com.baidu.palo.common; +import java.net.InetAddress; + public class Config extends ConfigBase { /* @@ -119,6 +121,13 @@ public class Config extends ConfigBase { */ @ConfField public static String replica_ack_policy = "SIMPLE_MAJORITY"; // ALL, NONE, SIMPLE_MAJORITY + /* + * Specified a ip for frontend, instead of ip get by *InetAddress.getByName*. + * This can be used when *InetAddress.getByName* get a unexpected ip address. + * Default is "0.0.0.0", which means not set. + */ + @ConfField public static String master_ip = "0.0.0.0"; + /* * Kudu is currently not supported. */ diff --git a/fe/src/com/baidu/palo/common/ErrorCode.java b/fe/src/com/baidu/palo/common/ErrorCode.java index 1f1b1af4253193..fe6412f922baf1 100644 --- a/fe/src/com/baidu/palo/common/ErrorCode.java +++ b/fe/src/com/baidu/palo/common/ErrorCode.java @@ -149,7 +149,7 @@ public enum ErrorCode { ERR_CLUSTER_INSTANCE_NUM_WRONG(5035, new byte[] { 'H', 'Y', '0', '0', '0' }, "Cluster '%s' has exist"), ERR_CLUSTER_BE_NOT_ENOUGH(5036, new byte[] { 'H', 'Y', '0', '0', '0' }, "Be is not enough"), ERR_CLUSTER_DELETE_DB_EXIST(5037, new byte[] { 'H', 'Y', '0', '0', '0' }, - "All datbases in cluster must be dropped before dropping cluster."), + "All datbases in cluster must be dropped before dropping cluster"), ERR_CLUSTER_DELETE_BE_ID_ERROR(5037, new byte[] { 'H', 'Y', '0', '0', '0' }, "There is no be's id in the System"), ERR_CLUSTER_NO_CLUSTER_NAME(5038, new byte[] { 'H', 'Y', '0', '0', '0' }, "There is no cluster name"), ERR_CLUSTER_SHOW_ACCESS_DENIED(5039, new byte[] {'4', '2', '0', '0', '0'}, diff --git a/fe/src/com/baidu/palo/common/FeNameFormat.java b/fe/src/com/baidu/palo/common/FeNameFormat.java index 3f97ef273b15da..f0cd0165c5a9a8 100644 --- a/fe/src/com/baidu/palo/common/FeNameFormat.java +++ b/fe/src/com/baidu/palo/common/FeNameFormat.java @@ -20,6 +20,8 @@ package com.baidu.palo.common; +import com.baidu.palo.analysis.ClusterName; +import com.baidu.palo.cluster.ClusterNamespace; import com.baidu.palo.system.SystemInfoService; import com.google.common.base.Strings; diff --git a/fe/src/com/baidu/palo/http/BaseAction.java b/fe/src/com/baidu/palo/http/BaseAction.java index 8427440c4cd7fd..efe0c84bf86bdd 100644 --- a/fe/src/com/baidu/palo/http/BaseAction.java +++ b/fe/src/com/baidu/palo/http/BaseAction.java @@ -35,6 +35,7 @@ import com.baidu.palo.mysql.MysqlPassword; import com.baidu.palo.qe.QeService; import com.baidu.palo.system.SystemInfoService; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -250,7 +251,7 @@ public boolean parseAuth(BaseRequest request, AuthorizationInfo authInfo) { final String[] elements = authInfo.fullUserName.split("@"); if (elements != null && elements.length < 2) { authInfo.fullUserName = ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER, - authInfo.fullUserName); + authInfo.fullUserName); authInfo.cluster = SystemInfoService.DEFAULT_CLUSTER; } else if (elements != null && elements.length == 2) { authInfo.fullUserName = ClusterNamespace.getFullName(elements[1], elements[0]); diff --git a/fe/src/com/baidu/palo/http/HttpServer.java b/fe/src/com/baidu/palo/http/HttpServer.java index ad3f06e094bc17..bfe4adb5ad27a2 100755 --- a/fe/src/com/baidu/palo/http/HttpServer.java +++ b/fe/src/com/baidu/palo/http/HttpServer.java @@ -35,6 +35,10 @@ import com.baidu.palo.http.meta.MetaService.PutAction; import com.baidu.palo.http.meta.MetaService.RoleAction; import com.baidu.palo.http.meta.MetaService.VersionAction; +import com.baidu.palo.http.rest.AddApacheHdfsBrokerAction; +import com.baidu.palo.http.rest.AddBackendAction; +import com.baidu.palo.http.rest.AddFrontendAction; +import com.baidu.palo.http.rest.BootstrapFinishAction; import com.baidu.palo.http.rest.CheckDecommissionAction; import com.baidu.palo.http.rest.GetDdlStmtAction; import com.baidu.palo.http.rest.GetLoadInfoAction; @@ -146,6 +150,13 @@ private void registerActions() throws IllegalArgException { CheckAction.registerAction(controller, imageDir); DumpAction.registerAction(controller, imageDir); RoleAction.registerAction(controller, imageDir); + + // add frontend backend broker action + AddBackendAction.registerAction(controller); + AddFrontendAction.registerAction(controller); + AddApacheHdfsBrokerAction.registerAction(controller); + + BootstrapFinishAction.registerAction(controller); } public void start() { diff --git a/fe/src/com/baidu/palo/http/rest/AddApacheHdfsBrokerAction.java b/fe/src/com/baidu/palo/http/rest/AddApacheHdfsBrokerAction.java new file mode 100644 index 00000000000000..1fc52ea1f16e99 --- /dev/null +++ b/fe/src/com/baidu/palo/http/rest/AddApacheHdfsBrokerAction.java @@ -0,0 +1,91 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.baidu.palo.http.rest; + +import com.baidu.palo.catalog.Catalog; +import com.baidu.palo.common.AnalysisException; +import com.baidu.palo.common.DdlException; +import com.baidu.palo.common.Pair; +import com.baidu.palo.http.ActionController; +import com.baidu.palo.http.BaseRequest; +import com.baidu.palo.http.BaseResponse; +import com.baidu.palo.http.IllegalArgException; +import com.baidu.palo.system.SystemInfoService; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import java.util.List; + +import io.netty.handler.codec.http.HttpMethod; + +/* + * fe_host:fe_http_port/api/add_apache_hdfs_broker?name=broker_name\&host_ports=host:port,host2:port2... + * return: + * {"status":"OK","msg":"Success"} + * {"status":"FAILED","msg":"err info..."} + */ +public class AddApacheHdfsBrokerAction extends RestBaseAction { + public static final String NAME = "name"; + public static final String HOST_PORTS = "host_ports"; + + public AddApacheHdfsBrokerAction(ActionController controller) { + super(controller); + } + + public static void registerAction(ActionController controller) throws IllegalArgException { + controller.registerHandler(HttpMethod.GET, + "/api/add_apache_hdfs_broker", + new AddApacheHdfsBrokerAction(controller)); + } + + @Override + public void execute(BaseRequest request, BaseResponse response) throws DdlException { + String name = request.getSingleParameter(NAME); + if (Strings.isNullOrEmpty(name)) { + throw new DdlException("No broker name specified."); + } + String hostPorts = request.getSingleParameter(HOST_PORTS); + if (Strings.isNullOrEmpty(hostPorts)) { + throw new DdlException("No host:port specified."); + } + + String[] hostPortArr = hostPorts.split(","); + if (hostPortArr.length == 0) { + throw new DdlException("No host:port specified."); + } + + List> hostPortPairs = Lists.newArrayList(); + for (String hostPort : hostPortArr) { + Pair pair; + try { + pair = SystemInfoService.validateHostAndPort(hostPort); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + hostPortPairs.add(pair); + } + + Catalog.getInstance().getBrokerMgr().addBrokers(name, hostPortPairs); + + // to json response + RestBaseResult result = new RestBaseResult(); + + // send result + response.setContentType("application/json"); + response.getContent().append(result.toJson()); + sendResult(request, response); + } +} diff --git a/fe/src/com/baidu/palo/http/rest/AddBackendAction.java b/fe/src/com/baidu/palo/http/rest/AddBackendAction.java new file mode 100644 index 00000000000000..1f0bf4ff7eb18d --- /dev/null +++ b/fe/src/com/baidu/palo/http/rest/AddBackendAction.java @@ -0,0 +1,84 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.baidu.palo.http.rest; + +import com.baidu.palo.catalog.Catalog; +import com.baidu.palo.common.AnalysisException; +import com.baidu.palo.common.DdlException; +import com.baidu.palo.common.Pair; +import com.baidu.palo.http.ActionController; +import com.baidu.palo.http.BaseRequest; +import com.baidu.palo.http.BaseResponse; +import com.baidu.palo.http.IllegalArgException; +import com.baidu.palo.system.SystemInfoService; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import java.util.List; + +import io.netty.handler.codec.http.HttpMethod; + +/* + * fe_host:fe_http_port/api/add_backend?host_ports=host:port,host2:port2... + * return: + * {"status":"OK","msg":"Success"} + * {"status":"FAILED","msg":"err info..."} + */ +public class AddBackendAction extends RestBaseAction { + public static final String HOST_PORTS = "host_ports"; + + public AddBackendAction(ActionController controller) { + super(controller); + } + + public static void registerAction(ActionController controller) throws IllegalArgException { + controller.registerHandler(HttpMethod.GET, "/api/add_backend", new AddBackendAction(controller)); + } + + @Override + public void execute(BaseRequest request, BaseResponse response) throws DdlException { + String hostPorts = request.getSingleParameter(HOST_PORTS); + if (Strings.isNullOrEmpty(hostPorts)) { + throw new DdlException("No host:port specified."); + } + + String[] hostPortArr = hostPorts.split(","); + if (hostPortArr.length == 0) { + throw new DdlException("No host:port specified."); + } + + List> hostPortPairs = Lists.newArrayList(); + for (String hostPort : hostPortArr) { + Pair pair; + try { + pair = SystemInfoService.validateHostAndPort(hostPort); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + hostPortPairs.add(pair); + } + + Catalog.getCurrentSystemInfo().addBackends(hostPortPairs, false); + + // to json response + RestBaseResult result = new RestBaseResult(); + + // send result + response.setContentType("application/json"); + response.getContent().append(result.toJson()); + sendResult(request, response); + } +} diff --git a/fe/src/com/baidu/palo/http/rest/AddFrontendAction.java b/fe/src/com/baidu/palo/http/rest/AddFrontendAction.java new file mode 100644 index 00000000000000..116850e2a8d45b --- /dev/null +++ b/fe/src/com/baidu/palo/http/rest/AddFrontendAction.java @@ -0,0 +1,107 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.baidu.palo.http.rest; + +import com.baidu.palo.catalog.Catalog; +import com.baidu.palo.common.AnalysisException; +import com.baidu.palo.common.DdlException; +import com.baidu.palo.common.Pair; +import com.baidu.palo.ha.FrontendNodeType; +import com.baidu.palo.http.ActionController; +import com.baidu.palo.http.BaseRequest; +import com.baidu.palo.http.BaseResponse; +import com.baidu.palo.http.IllegalArgException; +import com.baidu.palo.system.SystemInfoService; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import java.util.List; + +import io.netty.handler.codec.http.HttpMethod; + +/* + * fe_host:fe_http_port/api/add_frontend?role=follower\&host_ports=host:port,host2:port2... + * fe_host:fe_http_port/api/add_frontend?role=observer\&host_ports=host:port,host2:port2... + * return: + * {"status":"OK","msg":"Success"} + * {"status":"FAILED","msg":"err info..."} + */ +public class AddFrontendAction extends RestBaseAction { + public static final String ROLE = "role"; + public static final String FOLLOWER = "follower"; + public static final String OBSERVER = "observer"; + public static final String HOST_PORTS = "host_ports"; + + public AddFrontendAction(ActionController controller) { + super(controller); + } + + public static void registerAction(ActionController controller) throws IllegalArgException { + controller.registerHandler(HttpMethod.GET, "/api/add_frontend", new AddFrontendAction(controller)); + } + + @Override + public void execute(BaseRequest request, BaseResponse response) throws DdlException { + String role = request.getSingleParameter(ROLE); + if (Strings.isNullOrEmpty(role)) { + throw new DdlException("No frontend role specified."); + } + + if (!role.equals(FOLLOWER) && !role.equals(OBSERVER)) { + throw new DdlException("frontend role must specified to follower or observer"); + } + + String hostPorts = request.getSingleParameter(HOST_PORTS); + if (Strings.isNullOrEmpty(hostPorts)) { + throw new DdlException("No host:port specified."); + } + + String[] hostPortArr = hostPorts.split(","); + if (hostPortArr.length == 0) { + throw new DdlException("No host:port specified."); + } + + List> hostPortPairs = Lists.newArrayList(); + for (String hostPort : hostPortArr) { + Pair pair; + try { + pair = SystemInfoService.validateHostAndPort(hostPort); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + hostPortPairs.add(pair); + } + + FrontendNodeType nodeType; + if (role.equals(FOLLOWER)) { + nodeType = FrontendNodeType.FOLLOWER; + } else { + nodeType = FrontendNodeType.OBSERVER; + } + + for (Pair hostPortPair : hostPortPairs) { + Catalog.getInstance().addFrontend(nodeType, hostPortPair.first, hostPortPair.second); + } + + // to json response + RestBaseResult result = new RestBaseResult(); + + // send result + response.setContentType("application/json"); + response.getContent().append(result.toJson()); + sendResult(request, response); + } +} diff --git a/fe/src/com/baidu/palo/http/rest/BootstrapFinishAction.java b/fe/src/com/baidu/palo/http/rest/BootstrapFinishAction.java new file mode 100644 index 00000000000000..85b76f6de10685 --- /dev/null +++ b/fe/src/com/baidu/palo/http/rest/BootstrapFinishAction.java @@ -0,0 +1,69 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.baidu.palo.http.rest; + +import com.baidu.palo.catalog.Catalog; +import com.baidu.palo.common.AnalysisException; +import com.baidu.palo.common.DdlException; +import com.baidu.palo.common.Pair; +import com.baidu.palo.http.ActionController; +import com.baidu.palo.http.BaseRequest; +import com.baidu.palo.http.BaseResponse; +import com.baidu.palo.http.IllegalArgException; +import com.baidu.palo.system.SystemInfoService; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import java.util.List; + +import io.netty.handler.codec.http.HttpMethod; + +/* + * fe_host:fe_http_port/api/bootstrap + * return: + * {"status":"OK","msg":"Success"} + * {"status":"FAILED","msg":"err info..."} + */ +public class BootstrapFinishAction extends RestBaseAction { + public static final String HOST_PORTS = "host_ports"; + + public BootstrapFinishAction(ActionController controller) { + super(controller); + } + + public static void registerAction(ActionController controller) throws IllegalArgException { + controller.registerHandler(HttpMethod.GET, "/api/bootstrap", new BootstrapFinishAction(controller)); + } + + @Override + public void execute(BaseRequest request, BaseResponse response) throws DdlException { + + boolean canRead = Catalog.getInstance().canRead(); + + // to json response + RestBaseResult result = null; + if (canRead) { + result = RestBaseResult.getOk(); + } else { + result = new RestBaseResult("unfinished"); + } + + // send result + response.setContentType("application/json"); + response.getContent().append(result.toJson()); + sendResult(request, response); + } +} diff --git a/fe/src/com/baidu/palo/http/rest/GetLoadInfoAction.java b/fe/src/com/baidu/palo/http/rest/GetLoadInfoAction.java index 18dcbb33ffa183..741b76022a107f 100644 --- a/fe/src/com/baidu/palo/http/rest/GetLoadInfoAction.java +++ b/fe/src/com/baidu/palo/http/rest/GetLoadInfoAction.java @@ -22,6 +22,7 @@ import com.baidu.palo.http.BaseResponse; import com.baidu.palo.http.IllegalArgException; import com.baidu.palo.load.Load; + import com.google.common.base.Strings; import io.netty.handler.codec.http.HttpMethod; @@ -44,8 +45,10 @@ public static void registerAction(ActionController controller) @Override public void execute(BaseRequest request, BaseResponse response) throws DdlException { AuthorizationInfo authInfo = getAuthorizationInfo(request); - Load.JobInfo info = new Load.JobInfo(request.getSingleParameter(DB_KEY), request.getSingleParameter(LABEL_KEY), - authInfo.cluster); + + Load.JobInfo info = new Load.JobInfo(request.getSingleParameter(DB_KEY), + request.getSingleParameter(LABEL_KEY), + authInfo.cluster); if (Strings.isNullOrEmpty(info.dbName)) { throw new DdlException("No database selected"); } diff --git a/fe/src/com/baidu/palo/http/rest/MultiAbort.java b/fe/src/com/baidu/palo/http/rest/MultiAbort.java index b51ddabb1caffa..adbcfbc08f2976 100644 --- a/fe/src/com/baidu/palo/http/rest/MultiAbort.java +++ b/fe/src/com/baidu/palo/http/rest/MultiAbort.java @@ -22,6 +22,7 @@ import com.baidu.palo.http.BaseResponse; import com.baidu.palo.http.IllegalArgException; import com.baidu.palo.service.ExecuteEnv; + import com.google.common.base.Strings; import io.netty.handler.codec.http.HttpMethod; @@ -53,6 +54,7 @@ public void execute(BaseRequest request, BaseResponse response) throws DdlExcept if (Strings.isNullOrEmpty(label)) { throw new DdlException("No label selected"); } + AuthorizationInfo authInfo = getAuthorizationInfo(request); String fullDbName = ClusterNamespace.getFullName(authInfo.cluster, db); diff --git a/fe/src/com/baidu/palo/http/rest/MultiCommit.java b/fe/src/com/baidu/palo/http/rest/MultiCommit.java index f04ed9c896521d..96ab34a00a204e 100644 --- a/fe/src/com/baidu/palo/http/rest/MultiCommit.java +++ b/fe/src/com/baidu/palo/http/rest/MultiCommit.java @@ -21,7 +21,9 @@ import com.baidu.palo.http.BaseRequest; import com.baidu.palo.http.BaseResponse; import com.baidu.palo.http.IllegalArgException; +import com.baidu.palo.http.BaseAction.AuthorizationInfo; import com.baidu.palo.service.ExecuteEnv; + import com.google.common.base.Strings; import io.netty.handler.codec.http.HttpMethod; @@ -53,11 +55,11 @@ public void execute(BaseRequest request, BaseResponse response) throws DdlExcept if (Strings.isNullOrEmpty(label)) { throw new DdlException("No label selected"); } + AuthorizationInfo authInfo = getAuthorizationInfo(request); String fullDbName = ClusterNamespace.getFullName(authInfo.cluster, db); checkWritePriv(authInfo.fullUserName, fullDbName); - if (redirectToMaster(request, response)) { return; } diff --git a/fe/src/com/baidu/palo/http/rest/MultiDesc.java b/fe/src/com/baidu/palo/http/rest/MultiDesc.java index a1c1228bbea2ea..04773de197c751 100644 --- a/fe/src/com/baidu/palo/http/rest/MultiDesc.java +++ b/fe/src/com/baidu/palo/http/rest/MultiDesc.java @@ -15,20 +15,22 @@ package com.baidu.palo.http.rest; -import java.util.List; - import com.baidu.palo.cluster.ClusterNamespace; import com.baidu.palo.common.DdlException; import com.baidu.palo.http.ActionController; import com.baidu.palo.http.BaseRequest; import com.baidu.palo.http.BaseResponse; import com.baidu.palo.http.IllegalArgException; +import com.baidu.palo.http.BaseAction.AuthorizationInfo; import com.baidu.palo.service.ExecuteEnv; + import com.google.common.base.Strings; import com.google.common.collect.Lists; import io.netty.handler.codec.http.HttpMethod; +import java.util.List; + // List all labels of one multi-load public class MultiDesc extends RestBaseAction { private static final String DB_KEY = "db"; @@ -57,9 +59,9 @@ public void execute(BaseRequest request, BaseResponse response) throws DdlExcept if (Strings.isNullOrEmpty(label)) { throw new DdlException("No label selected"); } + AuthorizationInfo authInfo = getAuthorizationInfo(request); String fullDbName = ClusterNamespace.getFullName(authInfo.cluster, db); - checkReadPriv(authInfo.fullUserName, fullDbName); if (redirectToMaster(request, response)) { diff --git a/fe/src/com/baidu/palo/http/rest/MultiList.java b/fe/src/com/baidu/palo/http/rest/MultiList.java index fd8afa3b0b1893..913fec17968e89 100644 --- a/fe/src/com/baidu/palo/http/rest/MultiList.java +++ b/fe/src/com/baidu/palo/http/rest/MultiList.java @@ -15,20 +15,22 @@ package com.baidu.palo.http.rest; -import java.util.List; - import com.baidu.palo.cluster.ClusterNamespace; import com.baidu.palo.common.DdlException; import com.baidu.palo.http.ActionController; import com.baidu.palo.http.BaseRequest; import com.baidu.palo.http.BaseResponse; import com.baidu.palo.http.IllegalArgException; +import com.baidu.palo.http.BaseAction.AuthorizationInfo; import com.baidu.palo.service.ExecuteEnv; + import com.google.common.base.Strings; import com.google.common.collect.Lists; import io.netty.handler.codec.http.HttpMethod; +import java.util.List; + // list all multi load before commit public class MultiList extends RestBaseAction { private static final String DB_KEY = "db"; @@ -52,9 +54,9 @@ public void execute(BaseRequest request, BaseResponse response) throws DdlExcept if (Strings.isNullOrEmpty(db)) { throw new DdlException("No database selected"); } + AuthorizationInfo authInfo = getAuthorizationInfo(request); String fullDbName = ClusterNamespace.getFullName(authInfo.cluster, db); - checkReadPriv(authInfo.fullUserName, fullDbName); if (redirectToMaster(request, response)) { diff --git a/fe/src/com/baidu/palo/http/rest/MultiStart.java b/fe/src/com/baidu/palo/http/rest/MultiStart.java index 6a7fe76f246c63..1e3db7091b625b 100644 --- a/fe/src/com/baidu/palo/http/rest/MultiStart.java +++ b/fe/src/com/baidu/palo/http/rest/MultiStart.java @@ -15,8 +15,6 @@ package com.baidu.palo.http.rest; -import java.util.Map; - import com.baidu.palo.analysis.LoadStmt; import com.baidu.palo.cluster.ClusterNamespace; import com.baidu.palo.common.DdlException; @@ -24,12 +22,16 @@ import com.baidu.palo.http.BaseRequest; import com.baidu.palo.http.BaseResponse; import com.baidu.palo.http.IllegalArgException; +import com.baidu.palo.http.BaseAction.AuthorizationInfo; import com.baidu.palo.service.ExecuteEnv; + import com.google.common.base.Strings; import com.google.common.collect.Maps; import io.netty.handler.codec.http.HttpMethod; +import java.util.Map; + // Start multi action public class MultiStart extends RestBaseAction { private static final String DB_KEY = "db"; @@ -58,9 +60,9 @@ public void execute(BaseRequest request, BaseResponse response) throws DdlExcept if (Strings.isNullOrEmpty(label)) { throw new DdlException("No label selected"); } + AuthorizationInfo authInfo = getAuthorizationInfo(request); String fullDbName = ClusterNamespace.getFullName(authInfo.cluster, db); - checkWritePriv(authInfo.fullUserName, fullDbName); if (redirectToMaster(request, response)) { diff --git a/fe/src/com/baidu/palo/http/rest/MultiUnload.java b/fe/src/com/baidu/palo/http/rest/MultiUnload.java index 7c1d5cb893fbc9..82dd63cae63856 100644 --- a/fe/src/com/baidu/palo/http/rest/MultiUnload.java +++ b/fe/src/com/baidu/palo/http/rest/MultiUnload.java @@ -22,6 +22,7 @@ import com.baidu.palo.http.BaseResponse; import com.baidu.palo.http.IllegalArgException; import com.baidu.palo.service.ExecuteEnv; + import com.google.common.base.Strings; import io.netty.handler.codec.http.HttpMethod; diff --git a/fe/src/com/baidu/palo/load/DppScheduler.java b/fe/src/com/baidu/palo/load/DppScheduler.java index a7a04d62ecfe2e..c3abfb9d511224 100644 --- a/fe/src/com/baidu/palo/load/DppScheduler.java +++ b/fe/src/com/baidu/palo/load/DppScheduler.java @@ -13,525 +13,525 @@ // specific language governing permissions and limitations // under the License. -package com.baidu.palo.load; - -import com.baidu.palo.common.Config; -import com.baidu.palo.common.FeConstants; -import com.baidu.palo.common.LoadException; -import com.baidu.palo.common.util.CommandResult; -import com.baidu.palo.common.util.Util; -import com.baidu.palo.thrift.TEtlState; -import com.baidu.palo.thrift.TStatus; -import com.baidu.palo.thrift.TStatusCode; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.gson.Gson; - -import org.apache.commons.lang.StringUtils; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; - -public class DppScheduler { - private static final Logger LOG = LogManager.getLogger(DppScheduler.class); - - private static final String PALO_HOME = System.getenv("PALO_HOME"); - private static final String HADOOP_CLIENT = PALO_HOME + Config.dpp_hadoop_client_path; - private static final String DPP_OUTPUT_DIR = "export"; - private static final String JOB_CONFIG_DIR = PALO_HOME + "/temp/job_conf"; - private static final String JOB_CONFIG_FILE = "jobconfig.json"; - private static final String LOCAL_DPP_DIR = PALO_HOME + "/lib/dpp/" + FeConstants.dpp_version; - private static final int DEFAULT_REDUCE_NUM = 1000; - private static final long GB = 1024 * 1024 * 1024L; - - // hdfs://host:port/outputPath/dbId/loadLabel/etlOutputDir - private static final String ETL_OUTPUT_PATH = "%s%s/%d/%s/%s"; - private static final String ETL_JOB_NAME = "palo2__%s__%s"; - - // hadoop command - private static final String HADOOP_BISTREAMING_CMD = "%s bistreaming %s -D mapred.job.name=\"%s\" " - + "-input %s -output %s " - + "-mapper \"sh mapred/mapper.sh\" " - + "-reducer \"sh mapred/reducer.sh '\\\"%s\\\"'\" " - + "-partitioner com.baidu.sos.mapred.lib.MapIntPartitioner " - + "-cacheArchive %s/dpp/x86_64-scm-linux-gnu.tar.gz#tc " - + "-cacheArchive %s/dpp/pypy.tar.gz#pypy " - + "-cacheArchive %s/dpp/palo_dpp_mr.tar.gz#mapred " - + "-numReduceTasks %d -file \"%s\" "; - private static final String HADOOP_STATUS_CMD = "%s job %s -status %s"; - private static final String HADOOP_KILL_CMD = "%s job %s -kill %s"; - private static final String HADOOP_LS_CMD = "%s fs %s -ls %s"; - private static final String HADOOP_COUNT_CMD = "%s fs %s -count %s"; - private static final String HADOOP_TEST_CMD = "%s fs %s -test %s %s"; - private static final String HADOOP_MKDIR_CMD = "%s fs %s -mkdir %s"; - private static final String HADOOP_RMR_CMD = "%s fs %s -rmr %s"; - private static final String HADOOP_PUT_CMD = "%s fs %s -put %s %s"; - private static final long HADOOP_SPEED_LIMIT_KB = 10240L; // 10M - - private static final ConcurrentMap DPP_LOCK_MAP = Maps.newConcurrentMap(); - - private String hadoopConfig; - private String applicationsPath; - - public DppScheduler(DppConfig dppConfig) { - hadoopConfig = getHadoopConfigsStr(dppConfig.getHadoopConfigs()); - applicationsPath = dppConfig.getFsDefaultName() + dppConfig.getApplicationsPath(); - } - - private String getHadoopConfigsStr(Map hadoopConfigs) { - List configs = Lists.newArrayList(); - for (Map.Entry entry : hadoopConfigs.entrySet()) { - configs.add(String.format("%s=%s", entry.getKey(), entry.getValue())); - } - return String.format("-D %s", StringUtils.join(configs, " -D ")); - } - - public EtlSubmitResult submitEtlJob(long jobId, String loadLabel, String clusterName, - String dbName, Map jobConf, int retry) { - String etlJobId = null; - TStatus status = new TStatus(); - status.setStatus_code(TStatusCode.OK); - List failMsgs = new ArrayList(); - status.setError_msgs(failMsgs); - - // check dpp lock map - if (retry > 0) { - // failed once, try check dpp application - LOG.warn("submit etl retry[{}] > 0. check dpp application", retry); - // prepare dpp applications - DPP_LOCK_MAP.putIfAbsent(clusterName, new Object()); - Preconditions.checkState(DPP_LOCK_MAP.containsKey(clusterName)); - synchronized (DPP_LOCK_MAP.get(clusterName)) { - try { - prepareDppApplications(); - } catch (LoadException e) { - return null; - } - } - } - - // create job config file - String configDirPath = JOB_CONFIG_DIR + "/" + jobId; - File configDir = new File(configDirPath); - if (!Util.deleteDirectory(configDir)) { - LOG.warn("delete config dir error. job[{}]", jobId); - return null; - } - if (!configDir.mkdirs()) { - LOG.warn("create config file dir error. job[{}]", jobId); - return null; - } - File configFile = new File(configDirPath + "/" + JOB_CONFIG_FILE); - BufferedWriter bw = null; - try { - bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(configFile), "UTF-8")); - Gson gson = new Gson(); - bw.write(gson.toJson(jobConf)); - bw.flush(); - } catch (IOException e) { - Util.deleteDirectory(configDir); - LOG.warn("create config file error. job[" + jobId + "]", e); - return null; - } finally { - if (bw != null) { - try { - bw.close(); - } catch (IOException e) { - LOG.warn("close buffered writer error", e); - return null; - } - } - } - - // create input path - Set inputPaths = getInputPaths(jobConf); - String inputPath = StringUtils.join(inputPaths, " -input "); - - // reduce num - int reduceNumByInputSize = 0; - try { - reduceNumByInputSize = calcReduceNumByInputSize(inputPaths); - } catch (InputSizeInvalidException e) { - failMsgs.add(e.getMessage()); - status.setStatus_code(TStatusCode.CANCELLED); - return new EtlSubmitResult(status, null); - } - int reduceNumByTablet = calcReduceNumByTablet(jobConf); - int reduceNum = Math.min(reduceNumByInputSize, reduceNumByTablet); - LOG.debug("calculate reduce num. reduceNum: {}, reduceNumByInputSize: {}, reduceNumByTablet: {}", - reduceNum, reduceNumByInputSize, reduceNumByTablet); - - // rm path - String outputPath = (String) jobConf.get("output_path"); - deleteEtlOutputPath(outputPath); - - // submit etl job - String etlJobName = String.format(ETL_JOB_NAME, dbName, loadLabel); - String hadoopRunCmd = String.format(HADOOP_BISTREAMING_CMD, HADOOP_CLIENT, hadoopConfig, etlJobName, inputPath, - outputPath, hadoopConfig, applicationsPath, applicationsPath, applicationsPath, reduceNum, - configFile.getAbsolutePath()); - LOG.info(hadoopRunCmd); - String outputLine = null; - List hadoopRunCmdList = Util.shellSplit(hadoopRunCmd); - String[] hadoopRunCmds = hadoopRunCmdList.toArray(new String[0]); - BufferedReader errorReader = null; - long startTime = System.currentTimeMillis(); - try { - Process p = Runtime.getRuntime().exec(hadoopRunCmds); - errorReader = new BufferedReader(new InputStreamReader(p.getErrorStream())); - for (int i = 0; i < 1000; i++) { - outputLine = errorReader.readLine(); - LOG.info(outputLine); - if (Strings.isNullOrEmpty(outputLine)) { - LOG.warn("submit etl job fail. job id: {}, label: {}", jobId, loadLabel); - break; - } - - if (outputLine.toLowerCase().contains("error") - || outputLine.toLowerCase().contains("exception")) { - failMsgs.add(outputLine); - } - - if (outputLine.indexOf("Running job") != -1) { - String[] arr = outputLine.split(":"); - etlJobId = arr[arr.length - 1].trim(); - p.destroy(); - break; - } - } - } catch (IOException e) { - LOG.warn("submit etl job error", e); - return null; - } finally { - Util.deleteDirectory(configDir); - long endTime = System.currentTimeMillis(); - LOG.info("finished submit hadoop job: {}. cost: {} ms", jobId, endTime - startTime); - if (errorReader != null) { - try { - errorReader.close(); - } catch (IOException e) { - LOG.warn("close buffered reader error", e); - return null; - } - } - } - - if (etlJobId == null) { - status.setStatus_code(TStatusCode.CANCELLED); - } - return new EtlSubmitResult(status, etlJobId); - } - - private void prepareDppApplications() throws LoadException { - String hadoopDppDir = applicationsPath + "/dpp"; - boolean needUpload = false; - - // get local files - File dppDir = new File(LOCAL_DPP_DIR); - if (!dppDir.exists() || !dppDir.isDirectory()) { - LOG.warn("dpp dir does not exist"); - throw new LoadException("dpp dir does not exist"); - } - File[] localFiles = dppDir.listFiles(); - - // test hadoop dpp dir - String hadoopTestCmd = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, hadoopConfig, "-d", hadoopDppDir); - LOG.info(hadoopTestCmd); - CommandResult testResult = Util.executeCommand(hadoopTestCmd); - if (testResult.getReturnCode() == 0) { - String hadoopDppFilePath = hadoopDppDir + "/*"; - String hadoopCountCmd = String.format(HADOOP_COUNT_CMD, HADOOP_CLIENT, hadoopConfig, hadoopDppFilePath); - LOG.info(hadoopCountCmd); - CommandResult countResult = Util.executeCommand(hadoopCountCmd); - if (countResult.getReturnCode() != 0) { - LOG.warn("hadoop count error, result: {}", countResult); - throw new LoadException("hadoop count error. msg: " + countResult.getStderr()); - } - - Map fileMap = Maps.newHashMap(); - String[] fileInfos = countResult.getStdout().split("\n"); - for (String fileInfo : fileInfos) { - String[] fileInfoArr = fileInfo.trim().split(" +"); - if (fileInfoArr.length == 4) { - String filePath = fileInfoArr[3]; - String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); - long size = Long.parseLong(fileInfoArr[2]); - fileMap.put(fileName, size); - } - } - - // diff files - for (File file : localFiles) { - if (!file.isFile()) { - continue; - } - - String fileName = file.getName(); - if (!fileMap.containsKey(fileName)) { - LOG.info("hadoop dpp file does not exist. file: {}", fileName); - needUpload = true; - break; - } - - long localSize = file.length(); - long hadoopSize = fileMap.get(fileName); - if (localSize != hadoopSize) { - LOG.info("dpp files size are different. file: {}, local: {}, hadoop: {}", fileName, localSize, - hadoopSize); - needUpload = true; - break; - } - } - } else { - LOG.info("hadoop dir does not exist. dir: {}", hadoopDppDir); - needUpload = true; - } - - if (needUpload) { - // rmdir and mkdir - String hadoopRmrCmd = String.format(HADOOP_RMR_CMD, HADOOP_CLIENT, hadoopConfig, hadoopDppDir); - LOG.info(hadoopRmrCmd); - Util.executeCommand(hadoopRmrCmd); - String hadoopMkdirCmd = String.format(HADOOP_MKDIR_CMD, HADOOP_CLIENT, hadoopConfig, hadoopDppDir); - LOG.info(hadoopMkdirCmd); - Util.executeCommand(hadoopMkdirCmd); - - // upload dpp applications - String hadoopPutConfig = hadoopConfig + String.format(" -D speed.limit.kb=%d", HADOOP_SPEED_LIMIT_KB); - String hadoopPutCmd = null; - CommandResult putResult = null; - for (File file : localFiles) { - hadoopPutCmd = String.format(HADOOP_PUT_CMD, HADOOP_CLIENT, hadoopPutConfig, - LOCAL_DPP_DIR + "/" + file.getName(), hadoopDppDir); - LOG.info(hadoopPutCmd); - putResult = Util.executeCommand(hadoopPutCmd); - if (putResult.getReturnCode() != 0) { - LOG.warn("hadoop put fail. result: {}", putResult); - throw new LoadException("hadoop put fail. msg: " + putResult.getStderr()); - } - } - } - } - - private Set getInputPaths(Map jobConf) { - Set inputPaths = new HashSet(); - Map tables = (Map) jobConf.get("tables"); - for (Map table : tables.values()) { - Map sourceFileSchema = (Map) table.get("source_file_schema"); - for (Map> schema : sourceFileSchema.values()) { - List fileUrls = schema.get("file_urls"); - inputPaths.addAll(fileUrls); - } - } - return inputPaths; - } - - private int calcReduceNumByInputSize(Set inputPaths) throws InputSizeInvalidException { - int reduceNum = 0; - String hadoopCountCmd = String.format(HADOOP_COUNT_CMD, HADOOP_CLIENT, hadoopConfig, - StringUtils.join(inputPaths, " ")); - LOG.info(hadoopCountCmd); - CommandResult result = Util.executeCommand(hadoopCountCmd); - if (result.getReturnCode() != 0) { - LOG.warn("hadoop count error, result: {}", result); - return DEFAULT_REDUCE_NUM; - } - - // calc total size - long totalSizeB = 0L; - String[] fileInfos = result.getStdout().split("\n"); - for (String fileInfo : fileInfos) { - String[] fileInfoArr = fileInfo.trim().split(" +"); - if (fileInfoArr.length == 4) { - totalSizeB += Long.parseLong(fileInfoArr[2]); - } - } - - // check input size limit - int inputSizeLimitGB = Config.load_input_size_limit_gb; - if (inputSizeLimitGB != 0) { - if (totalSizeB > inputSizeLimitGB * GB) { - String failMsg = "Input file size[" + (float) totalSizeB / GB + "GB]" - + " exceeds system limit[" + inputSizeLimitGB + "GB]"; - LOG.warn(failMsg); - throw new InputSizeInvalidException(failMsg); - } - } - - if (totalSizeB != 0) { - reduceNum = (int) (totalSizeB / Config.dpp_bytes_per_reduce) + 1; - } - return reduceNum; - } - - private int calcReduceNumByTablet(Map jobConf) { - int reduceNum = 0; - Map tables = (Map) jobConf.get("tables"); - for (Map table : tables.values()) { - Map views = (Map) table.get("views"); - for (Map view : views.values()) { - if (view.containsKey("hash_mod")) { - // hash or random - reduceNum += (int) view.get("hash_mod"); - } else if (view.containsKey("key_ranges")) { - // key range - List rangeList = (List) view.get("key_ranges"); - reduceNum += rangeList.size(); - } - } - } - return reduceNum; - } - - public EtlStatus getEtlJobStatus(String etlJobId) { - EtlStatus status = new EtlStatus(); - status.setState(TEtlState.RUNNING); - String hadoopStatusCmd = String.format(HADOOP_STATUS_CMD, HADOOP_CLIENT, hadoopConfig, etlJobId); - LOG.info(hadoopStatusCmd); - - CommandResult result = Util.executeCommand(hadoopStatusCmd); - String stdout = result.getStdout(); - if (result.getReturnCode() != 0) { - if (stdout != null && stdout.contains("Could not find job")) { - LOG.warn("cannot find hadoop etl job: {}", etlJobId); - status.setState(TEtlState.CANCELLED); - } - return status; - } - - // stats and counters - Map stats = new HashMap(); - Map counters = new HashMap(); - String[] stdoutLines = stdout.split("\n"); - String[] array = null; - for (String line : stdoutLines) { - array = line.split(":"); - if (array.length == 2) { - stats.put(array[0].trim(), array[1].trim()); - } - - array = line.split("="); - if (array.length == 2) { - counters.put(array[0].trim(), array[1].trim()); - } - } - status.setStats(stats); - status.setCounters(counters); - - // tracking url - for (String key : counters.keySet()) { - if (key.startsWith("tracking URL")) { - // remove "tracking URL: ", total 14 chars - status.setTrackingUrl(key.substring(14) + "=" + counters.get(key)); - break; - } - } - - // job state - if (stats.containsKey("job state")) { - int jobState = Integer.parseInt(stats.get("job state")); - if (jobState == 3 || jobState == 5 || jobState == 6) { - // 3:failed 5or6:killed --> cancelled - status.setState(TEtlState.CANCELLED); - } else if (jobState == 2) { - // 2:success --> finished - status.setState(TEtlState.FINISHED); - } else { - // 0:init 1:running 4:prepare --> running - status.setState(TEtlState.RUNNING); - } - } - - return status; - } - - public Map getEtlFiles(String outputPath) { - Map fileMap = Maps.newHashMap(); - - String fileDir = outputPath + "/" + DPP_OUTPUT_DIR; - String hadoopLsCmd = String.format(HADOOP_LS_CMD, HADOOP_CLIENT, hadoopConfig, fileDir); - LOG.info(hadoopLsCmd); - CommandResult lsResult = Util.executeCommand(hadoopLsCmd); - if (lsResult.getReturnCode() != 0) { - // check outputPath exist - String hadoopTestCmd = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, hadoopConfig, "-d", outputPath); - LOG.info(hadoopTestCmd); - CommandResult testResult = Util.executeCommand(hadoopTestCmd); - if (testResult.getReturnCode() != 0) { - LOG.info("hadoop dir does not exist. dir: {}", outputPath); - return null; - } - - // check outputPath + DPP_OUTPUT_DIR exist - hadoopTestCmd = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, hadoopConfig, "-d", fileDir); - LOG.info(hadoopTestCmd); - testResult = Util.executeCommand(hadoopTestCmd); - if (testResult.getReturnCode() != 0) { - LOG.info("hadoop dir does not exist. dir: {}", fileDir); - return fileMap; - } else { - return null; - } - } - - String stdout = lsResult.getStdout(); - String[] lsFileResults = stdout.split("\n"); - for (String line : lsFileResults) { - // drwxr-xr-x 3 palo palo 0 2014-12-08 14:37 /tmp/file - String[] fileInfos = line.split(" +"); - if (fileInfos.length == 8) { - String filePath = fileInfos[fileInfos.length - 1]; - long fileSize = -1; - try { - fileSize = Long.parseLong(fileInfos[4]); - } catch (NumberFormatException e) { - LOG.warn("file size format error. line: {}", line); - } - - fileMap.put(filePath, fileSize); - } - } - return fileMap; - } - - public void killEtlJob(String etlJobId) { - String hadoopKillCmd = String.format(HADOOP_KILL_CMD, HADOOP_CLIENT, hadoopConfig, etlJobId); - LOG.info(hadoopKillCmd); - Util.executeCommand(hadoopKillCmd); - } - - public void deleteEtlOutputPath(String outputPath) { - String hadoopRmCmd = String.format(HADOOP_RMR_CMD, HADOOP_CLIENT, hadoopConfig, outputPath); - LOG.info(hadoopRmCmd); - Util.executeCommand(hadoopRmCmd); - } - - public static String getEtlOutputPath(String fsDefaultName, String outputPath, long dbId, String loadLabel, - String etlOutputDir) { - return String.format(ETL_OUTPUT_PATH, fsDefaultName, outputPath, dbId, loadLabel, etlOutputDir); - } - - private class InputSizeInvalidException extends LoadException { - public InputSizeInvalidException(String msg) { - super(msg); - } - } - -} +package com.baidu.palo.load; + +import com.baidu.palo.common.Config; +import com.baidu.palo.common.FeConstants; +import com.baidu.palo.common.LoadException; +import com.baidu.palo.common.util.CommandResult; +import com.baidu.palo.common.util.Util; +import com.baidu.palo.thrift.TEtlState; +import com.baidu.palo.thrift.TStatus; +import com.baidu.palo.thrift.TStatusCode; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.Gson; + +import org.apache.commons.lang.StringUtils; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +public class DppScheduler { + private static final Logger LOG = LogManager.getLogger(DppScheduler.class); + + private static final String PALO_HOME = System.getenv("PALO_HOME"); + private static final String HADOOP_CLIENT = PALO_HOME + Config.dpp_hadoop_client_path; + private static final String DPP_OUTPUT_DIR = "export"; + private static final String JOB_CONFIG_DIR = PALO_HOME + "/temp/job_conf"; + private static final String JOB_CONFIG_FILE = "jobconfig.json"; + private static final String LOCAL_DPP_DIR = PALO_HOME + "/lib/dpp/" + FeConstants.dpp_version; + private static final int DEFAULT_REDUCE_NUM = 1000; + private static final long GB = 1024 * 1024 * 1024L; + + // hdfs://host:port/outputPath/dbId/loadLabel/etlOutputDir + private static final String ETL_OUTPUT_PATH = "%s%s/%d/%s/%s"; + private static final String ETL_JOB_NAME = "palo2__%s__%s"; + + // hadoop command + private static final String HADOOP_BISTREAMING_CMD = "%s bistreaming %s -D mapred.job.name=\"%s\" " + + "-input %s -output %s " + + "-mapper \"sh mapred/mapper.sh\" " + + "-reducer \"sh mapred/reducer.sh '\\\"%s\\\"'\" " + + "-partitioner com.baidu.sos.mapred.lib.MapIntPartitioner " + + "-cacheArchive %s/dpp/x86_64-scm-linux-gnu.tar.gz#tc " + + "-cacheArchive %s/dpp/pypy.tar.gz#pypy " + + "-cacheArchive %s/dpp/palo_dpp_mr.tar.gz#mapred " + + "-numReduceTasks %d -file \"%s\" "; + private static final String HADOOP_STATUS_CMD = "%s job %s -status %s"; + private static final String HADOOP_KILL_CMD = "%s job %s -kill %s"; + private static final String HADOOP_LS_CMD = "%s fs %s -ls %s"; + private static final String HADOOP_COUNT_CMD = "%s fs %s -count %s"; + private static final String HADOOP_TEST_CMD = "%s fs %s -test %s %s"; + private static final String HADOOP_MKDIR_CMD = "%s fs %s -mkdir %s"; + private static final String HADOOP_RMR_CMD = "%s fs %s -rmr %s"; + private static final String HADOOP_PUT_CMD = "%s fs %s -put %s %s"; + private static final long HADOOP_SPEED_LIMIT_KB = 10240L; // 10M + + private static final ConcurrentMap DPP_LOCK_MAP = Maps.newConcurrentMap(); + + private String hadoopConfig; + private String applicationsPath; + + public DppScheduler(DppConfig dppConfig) { + hadoopConfig = getHadoopConfigsStr(dppConfig.getHadoopConfigs()); + applicationsPath = dppConfig.getFsDefaultName() + dppConfig.getApplicationsPath(); + } + + private String getHadoopConfigsStr(Map hadoopConfigs) { + List configs = Lists.newArrayList(); + for (Map.Entry entry : hadoopConfigs.entrySet()) { + configs.add(String.format("%s=%s", entry.getKey(), entry.getValue())); + } + return String.format("-D %s", StringUtils.join(configs, " -D ")); + } + + public EtlSubmitResult submitEtlJob(long jobId, String loadLabel, String clusterName, + String dbName, Map jobConf, int retry) { + String etlJobId = null; + TStatus status = new TStatus(); + status.setStatus_code(TStatusCode.OK); + List failMsgs = new ArrayList(); + status.setError_msgs(failMsgs); + + // check dpp lock map + if (retry > 0) { + // failed once, try check dpp application + LOG.warn("submit etl retry[{}] > 0. check dpp application", retry); + // prepare dpp applications + DPP_LOCK_MAP.putIfAbsent(clusterName, new Object()); + Preconditions.checkState(DPP_LOCK_MAP.containsKey(clusterName)); + synchronized (DPP_LOCK_MAP.get(clusterName)) { + try { + prepareDppApplications(); + } catch (LoadException e) { + return null; + } + } + } + + // create job config file + String configDirPath = JOB_CONFIG_DIR + "/" + jobId; + File configDir = new File(configDirPath); + if (!Util.deleteDirectory(configDir)) { + LOG.warn("delete config dir error. job[{}]", jobId); + return null; + } + if (!configDir.mkdirs()) { + LOG.warn("create config file dir error. job[{}]", jobId); + return null; + } + File configFile = new File(configDirPath + "/" + JOB_CONFIG_FILE); + BufferedWriter bw = null; + try { + bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(configFile), "UTF-8")); + Gson gson = new Gson(); + bw.write(gson.toJson(jobConf)); + bw.flush(); + } catch (IOException e) { + Util.deleteDirectory(configDir); + LOG.warn("create config file error. job[" + jobId + "]", e); + return null; + } finally { + if (bw != null) { + try { + bw.close(); + } catch (IOException e) { + LOG.warn("close buffered writer error", e); + return null; + } + } + } + + // create input path + Set inputPaths = getInputPaths(jobConf); + String inputPath = StringUtils.join(inputPaths, " -input "); + + // reduce num + int reduceNumByInputSize = 0; + try { + reduceNumByInputSize = calcReduceNumByInputSize(inputPaths); + } catch (InputSizeInvalidException e) { + failMsgs.add(e.getMessage()); + status.setStatus_code(TStatusCode.CANCELLED); + return new EtlSubmitResult(status, null); + } + int reduceNumByTablet = calcReduceNumByTablet(jobConf); + int reduceNum = Math.min(reduceNumByInputSize, reduceNumByTablet); + LOG.debug("calculate reduce num. reduceNum: {}, reduceNumByInputSize: {}, reduceNumByTablet: {}", + reduceNum, reduceNumByInputSize, reduceNumByTablet); + + // rm path + String outputPath = (String) jobConf.get("output_path"); + deleteEtlOutputPath(outputPath); + + // submit etl job + String etlJobName = String.format(ETL_JOB_NAME, dbName, loadLabel); + String hadoopRunCmd = String.format(HADOOP_BISTREAMING_CMD, HADOOP_CLIENT, hadoopConfig, etlJobName, inputPath, + outputPath, hadoopConfig, applicationsPath, applicationsPath, applicationsPath, reduceNum, + configFile.getAbsolutePath()); + LOG.info(hadoopRunCmd); + String outputLine = null; + List hadoopRunCmdList = Util.shellSplit(hadoopRunCmd); + String[] hadoopRunCmds = hadoopRunCmdList.toArray(new String[0]); + BufferedReader errorReader = null; + long startTime = System.currentTimeMillis(); + try { + Process p = Runtime.getRuntime().exec(hadoopRunCmds); + errorReader = new BufferedReader(new InputStreamReader(p.getErrorStream())); + for (int i = 0; i < 1000; i++) { + outputLine = errorReader.readLine(); + LOG.info(outputLine); + if (Strings.isNullOrEmpty(outputLine)) { + LOG.warn("submit etl job fail. job id: {}, label: {}", jobId, loadLabel); + break; + } + + if (outputLine.toLowerCase().contains("error") + || outputLine.toLowerCase().contains("exception")) { + failMsgs.add(outputLine); + } + + if (outputLine.indexOf("Running job") != -1) { + String[] arr = outputLine.split(":"); + etlJobId = arr[arr.length - 1].trim(); + p.destroy(); + break; + } + } + } catch (IOException e) { + LOG.warn("submit etl job error", e); + return null; + } finally { + Util.deleteDirectory(configDir); + long endTime = System.currentTimeMillis(); + LOG.info("finished submit hadoop job: {}. cost: {} ms", jobId, endTime - startTime); + if (errorReader != null) { + try { + errorReader.close(); + } catch (IOException e) { + LOG.warn("close buffered reader error", e); + return null; + } + } + } + + if (etlJobId == null) { + status.setStatus_code(TStatusCode.CANCELLED); + } + return new EtlSubmitResult(status, etlJobId); + } + + private void prepareDppApplications() throws LoadException { + String hadoopDppDir = applicationsPath + "/dpp"; + boolean needUpload = false; + + // get local files + File dppDir = new File(LOCAL_DPP_DIR); + if (!dppDir.exists() || !dppDir.isDirectory()) { + LOG.warn("dpp dir does not exist"); + throw new LoadException("dpp dir does not exist"); + } + File[] localFiles = dppDir.listFiles(); + + // test hadoop dpp dir + String hadoopTestCmd = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, hadoopConfig, "-d", hadoopDppDir); + LOG.info(hadoopTestCmd); + CommandResult testResult = Util.executeCommand(hadoopTestCmd); + if (testResult.getReturnCode() == 0) { + String hadoopDppFilePath = hadoopDppDir + "/*"; + String hadoopCountCmd = String.format(HADOOP_COUNT_CMD, HADOOP_CLIENT, hadoopConfig, hadoopDppFilePath); + LOG.info(hadoopCountCmd); + CommandResult countResult = Util.executeCommand(hadoopCountCmd); + if (countResult.getReturnCode() != 0) { + LOG.warn("hadoop count error, result: {}", countResult); + throw new LoadException("hadoop count error. msg: " + countResult.getStderr()); + } + + Map fileMap = Maps.newHashMap(); + String[] fileInfos = countResult.getStdout().split("\n"); + for (String fileInfo : fileInfos) { + String[] fileInfoArr = fileInfo.trim().split(" +"); + if (fileInfoArr.length == 4) { + String filePath = fileInfoArr[3]; + String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); + long size = Long.parseLong(fileInfoArr[2]); + fileMap.put(fileName, size); + } + } + + // diff files + for (File file : localFiles) { + if (!file.isFile()) { + continue; + } + + String fileName = file.getName(); + if (!fileMap.containsKey(fileName)) { + LOG.info("hadoop dpp file does not exist. file: {}", fileName); + needUpload = true; + break; + } + + long localSize = file.length(); + long hadoopSize = fileMap.get(fileName); + if (localSize != hadoopSize) { + LOG.info("dpp files size are different. file: {}, local: {}, hadoop: {}", fileName, localSize, + hadoopSize); + needUpload = true; + break; + } + } + } else { + LOG.info("hadoop dir does not exist. dir: {}", hadoopDppDir); + needUpload = true; + } + + if (needUpload) { + // rmdir and mkdir + String hadoopRmrCmd = String.format(HADOOP_RMR_CMD, HADOOP_CLIENT, hadoopConfig, hadoopDppDir); + LOG.info(hadoopRmrCmd); + Util.executeCommand(hadoopRmrCmd); + String hadoopMkdirCmd = String.format(HADOOP_MKDIR_CMD, HADOOP_CLIENT, hadoopConfig, hadoopDppDir); + LOG.info(hadoopMkdirCmd); + Util.executeCommand(hadoopMkdirCmd); + + // upload dpp applications + String hadoopPutConfig = hadoopConfig + String.format(" -D speed.limit.kb=%d", HADOOP_SPEED_LIMIT_KB); + String hadoopPutCmd = null; + CommandResult putResult = null; + for (File file : localFiles) { + hadoopPutCmd = String.format(HADOOP_PUT_CMD, HADOOP_CLIENT, hadoopPutConfig, + LOCAL_DPP_DIR + "/" + file.getName(), hadoopDppDir); + LOG.info(hadoopPutCmd); + putResult = Util.executeCommand(hadoopPutCmd); + if (putResult.getReturnCode() != 0) { + LOG.warn("hadoop put fail. result: {}", putResult); + throw new LoadException("hadoop put fail. msg: " + putResult.getStderr()); + } + } + } + } + + private Set getInputPaths(Map jobConf) { + Set inputPaths = new HashSet(); + Map tables = (Map) jobConf.get("tables"); + for (Map table : tables.values()) { + Map sourceFileSchema = (Map) table.get("source_file_schema"); + for (Map> schema : sourceFileSchema.values()) { + List fileUrls = schema.get("file_urls"); + inputPaths.addAll(fileUrls); + } + } + return inputPaths; + } + + private int calcReduceNumByInputSize(Set inputPaths) throws InputSizeInvalidException { + int reduceNum = 0; + String hadoopCountCmd = String.format(HADOOP_COUNT_CMD, HADOOP_CLIENT, hadoopConfig, + StringUtils.join(inputPaths, " ")); + LOG.info(hadoopCountCmd); + CommandResult result = Util.executeCommand(hadoopCountCmd); + if (result.getReturnCode() != 0) { + LOG.warn("hadoop count error, result: {}", result); + return DEFAULT_REDUCE_NUM; + } + + // calc total size + long totalSizeB = 0L; + String[] fileInfos = result.getStdout().split("\n"); + for (String fileInfo : fileInfos) { + String[] fileInfoArr = fileInfo.trim().split(" +"); + if (fileInfoArr.length == 4) { + totalSizeB += Long.parseLong(fileInfoArr[2]); + } + } + + // check input size limit + int inputSizeLimitGB = Config.load_input_size_limit_gb; + if (inputSizeLimitGB != 0) { + if (totalSizeB > inputSizeLimitGB * GB) { + String failMsg = "Input file size[" + (float) totalSizeB / GB + "GB]" + + " exceeds system limit[" + inputSizeLimitGB + "GB]"; + LOG.warn(failMsg); + throw new InputSizeInvalidException(failMsg); + } + } + + if (totalSizeB != 0) { + reduceNum = (int) (totalSizeB / Config.dpp_bytes_per_reduce) + 1; + } + return reduceNum; + } + + private int calcReduceNumByTablet(Map jobConf) { + int reduceNum = 0; + Map tables = (Map) jobConf.get("tables"); + for (Map table : tables.values()) { + Map views = (Map) table.get("views"); + for (Map view : views.values()) { + if (view.containsKey("hash_mod")) { + // hash or random + reduceNum += (int) view.get("hash_mod"); + } else if (view.containsKey("key_ranges")) { + // key range + List rangeList = (List) view.get("key_ranges"); + reduceNum += rangeList.size(); + } + } + } + return reduceNum; + } + + public EtlStatus getEtlJobStatus(String etlJobId) { + EtlStatus status = new EtlStatus(); + status.setState(TEtlState.RUNNING); + String hadoopStatusCmd = String.format(HADOOP_STATUS_CMD, HADOOP_CLIENT, hadoopConfig, etlJobId); + LOG.info(hadoopStatusCmd); + + CommandResult result = Util.executeCommand(hadoopStatusCmd); + String stdout = result.getStdout(); + if (result.getReturnCode() != 0) { + if (stdout != null && stdout.contains("Could not find job")) { + LOG.warn("cannot find hadoop etl job: {}", etlJobId); + status.setState(TEtlState.CANCELLED); + } + return status; + } + + // stats and counters + Map stats = new HashMap(); + Map counters = new HashMap(); + String[] stdoutLines = stdout.split("\n"); + String[] array = null; + for (String line : stdoutLines) { + array = line.split(":"); + if (array.length == 2) { + stats.put(array[0].trim(), array[1].trim()); + } + + array = line.split("="); + if (array.length == 2) { + counters.put(array[0].trim(), array[1].trim()); + } + } + status.setStats(stats); + status.setCounters(counters); + + // tracking url + for (String key : counters.keySet()) { + if (key.startsWith("tracking URL")) { + // remove "tracking URL: ", total 14 chars + status.setTrackingUrl(key.substring(14) + "=" + counters.get(key)); + break; + } + } + + // job state + if (stats.containsKey("job state")) { + int jobState = Integer.parseInt(stats.get("job state")); + if (jobState == 3 || jobState == 5 || jobState == 6) { + // 3:failed 5or6:killed --> cancelled + status.setState(TEtlState.CANCELLED); + } else if (jobState == 2) { + // 2:success --> finished + status.setState(TEtlState.FINISHED); + } else { + // 0:init 1:running 4:prepare --> running + status.setState(TEtlState.RUNNING); + } + } + + return status; + } + + public Map getEtlFiles(String outputPath) { + Map fileMap = Maps.newHashMap(); + + String fileDir = outputPath + "/" + DPP_OUTPUT_DIR; + String hadoopLsCmd = String.format(HADOOP_LS_CMD, HADOOP_CLIENT, hadoopConfig, fileDir); + LOG.info(hadoopLsCmd); + CommandResult lsResult = Util.executeCommand(hadoopLsCmd); + if (lsResult.getReturnCode() != 0) { + // check outputPath exist + String hadoopTestCmd = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, hadoopConfig, "-d", outputPath); + LOG.info(hadoopTestCmd); + CommandResult testResult = Util.executeCommand(hadoopTestCmd); + if (testResult.getReturnCode() != 0) { + LOG.info("hadoop dir does not exist. dir: {}", outputPath); + return null; + } + + // check outputPath + DPP_OUTPUT_DIR exist + hadoopTestCmd = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, hadoopConfig, "-d", fileDir); + LOG.info(hadoopTestCmd); + testResult = Util.executeCommand(hadoopTestCmd); + if (testResult.getReturnCode() != 0) { + LOG.info("hadoop dir does not exist. dir: {}", fileDir); + return fileMap; + } else { + return null; + } + } + + String stdout = lsResult.getStdout(); + String[] lsFileResults = stdout.split("\n"); + for (String line : lsFileResults) { + // drwxr-xr-x 3 palo palo 0 2014-12-08 14:37 /tmp/file + String[] fileInfos = line.split(" +"); + if (fileInfos.length == 8) { + String filePath = fileInfos[fileInfos.length - 1]; + long fileSize = -1; + try { + fileSize = Long.parseLong(fileInfos[4]); + } catch (NumberFormatException e) { + LOG.warn("file size format error. line: {}", line); + } + + fileMap.put(filePath, fileSize); + } + } + return fileMap; + } + + public void killEtlJob(String etlJobId) { + String hadoopKillCmd = String.format(HADOOP_KILL_CMD, HADOOP_CLIENT, hadoopConfig, etlJobId); + LOG.info(hadoopKillCmd); + Util.executeCommand(hadoopKillCmd); + } + + public void deleteEtlOutputPath(String outputPath) { + String hadoopRmCmd = String.format(HADOOP_RMR_CMD, HADOOP_CLIENT, hadoopConfig, outputPath); + LOG.info(hadoopRmCmd); + Util.executeCommand(hadoopRmCmd); + } + + public static String getEtlOutputPath(String fsDefaultName, String outputPath, long dbId, String loadLabel, + String etlOutputDir) { + return String.format(ETL_OUTPUT_PATH, fsDefaultName, outputPath, dbId, loadLabel, etlOutputDir); + } + + private class InputSizeInvalidException extends LoadException { + public InputSizeInvalidException(String msg) { + super(msg); + } + } + +} diff --git a/fe/src/com/baidu/palo/load/Load.java b/fe/src/com/baidu/palo/load/Load.java index bc031ffbbcd05b..95aff6c151f25a 100644 --- a/fe/src/com/baidu/palo/load/Load.java +++ b/fe/src/com/baidu/palo/load/Load.java @@ -15,25 +15,6 @@ package com.baidu.palo.load; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.commons.lang.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.baidu.palo.analysis.BinaryPredicate; import com.baidu.palo.analysis.CancelLoadStmt; import com.baidu.palo.analysis.ColumnSeparator; @@ -90,11 +71,12 @@ import com.baidu.palo.task.AgentTaskQueue; import com.baidu.palo.task.CancelDeleteTask; import com.baidu.palo.task.PushTask; -import com.baidu.palo.thrift.TEtlState; import com.baidu.palo.thrift.TMiniLoadRequest; +import com.baidu.palo.thrift.TEtlState; import com.baidu.palo.thrift.TNetworkAddress; import com.baidu.palo.thrift.TPriority; import com.baidu.palo.thrift.TPushType; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -103,6 +85,25 @@ import com.google.common.collect.Sets; import com.google.gson.Gson; +import org.apache.commons.lang.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + public class Load { private static final Logger LOG = LogManager.getLogger(Load.class); diff --git a/fe/src/com/baidu/palo/planner/SingleNodePlanner.java b/fe/src/com/baidu/palo/planner/SingleNodePlanner.java index 9595d46467b637..a2b5fb66d63b08 100644 --- a/fe/src/com/baidu/palo/planner/SingleNodePlanner.java +++ b/fe/src/com/baidu/palo/planner/SingleNodePlanner.java @@ -623,7 +623,7 @@ private PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer, long for (TableRef tblRef : selectStmt.getTableRefs()) { rowTuples.addAll(tblRef.getMaterializedTupleIds()); } - + // create left-deep sequence of binary hash joins; assign node ids as we go along TableRef tblRef = selectStmt.getTableRefs().get(0); PlanNode root = createTableRefNode(analyzer, tblRef); diff --git a/fe/src/com/baidu/palo/service/FrontendOptions.java b/fe/src/com/baidu/palo/service/FrontendOptions.java index 2cdf5cea13f697..7e8fbea547ea84 100644 --- a/fe/src/com/baidu/palo/service/FrontendOptions.java +++ b/fe/src/com/baidu/palo/service/FrontendOptions.java @@ -17,8 +17,6 @@ import java.net.Inet4Address; import java.net.InetAddress; -import java.net.SocketException; -import java.net.UnknownHostException; import java.util.List; import java.util.ArrayList; diff --git a/fe/src/com/baidu/palo/system/SystemInfoService.java b/fe/src/com/baidu/palo/system/SystemInfoService.java index 59655126a43409..632c806ea6740b 100644 --- a/fe/src/com/baidu/palo/system/SystemInfoService.java +++ b/fe/src/com/baidu/palo/system/SystemInfoService.java @@ -600,10 +600,10 @@ public List calculateExpansionBackends(String clusterName, int expansionNu */ public List getClusterBackends(String name) { final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); - final List ret = new ArrayList(); + final List ret = Lists.newArrayList(); if (Strings.isNullOrEmpty(name)) { - return null; + return ret; } for (Backend backend : copiedBackends.values()) { @@ -987,7 +987,7 @@ public static Pair validateHostAndPort(String hostPort) throws public void replayAddBackend(Backend newBackend) { // update idToBackend - if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_23) { + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_30) { newBackend.setOwnerClusterName(DEFAULT_CLUSTER); newBackend.setBackendState(BackendState.using); } From 12e8c7fa6b0e553997e62716ee45f0e646b34bce Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 4 Sep 2017 17:47:03 +0800 Subject: [PATCH 4/5] change MASTER_CLIENT_TIMEOUT --- be/src/agent/utils.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/agent/utils.h b/be/src/agent/utils.h index e00c3e686ce75f..03da3f7132a1e9 100644 --- a/be/src/agent/utils.h +++ b/be/src/agent/utils.h @@ -32,7 +32,7 @@ namespace palo { -const uint32_t MASTER_CLIENT_TIMEOUT = 500; +const uint32_t MASTER_CLIENT_TIMEOUT = 3000; // client cache // All service client should be defined in client_cache.h From 9e79616656ffd2bd2a3bc78cddf10fe5b0a643e0 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 4 Sep 2017 17:50:48 +0800 Subject: [PATCH 5/5] fix right outer join with null column --- be/src/exec/hash_join_node.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 7c176ab814a612..167e4101cebc9e 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -134,9 +134,13 @@ Status HashJoinNode::prepare(RuntimeState* state) { _build_tuple_row_size = num_build_tuples * sizeof(Tuple*); // TODO: default buckets + const bool stores_nulls = _join_op == TJoinOp::RIGHT_OUTER_JOIN + || _join_op == TJoinOp::FULL_OUTER_JOIN + || _join_op == TJoinOp::RIGHT_ANTI_JOIN + || _join_op == TJoinOp::RIGHT_SEMI_JOIN; _hash_tbl.reset(new HashTable( _build_expr_ctxs, _probe_expr_ctxs, _build_tuple_size, - false, id(), mem_tracker(), 1024)); + stores_nulls, id(), mem_tracker(), 1024)); _probe_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));