From eab3457171ddabd798b69e5d6ff08afc563ac9e9 Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Tue, 29 Aug 2017 11:26:37 +0800 Subject: [PATCH] query and pull load select an backend that does't belong to the cluster which it is located --- fe/src/com/baidu/palo/load/ExportJob.java | 7 ++++++- fe/src/com/baidu/palo/qe/Coordinator.java | 7 +++++-- .../baidu/palo/system/SystemInfoService.java | 17 ++++++++++++++++- fe/src/com/baidu/palo/task/PullLoadTask.java | 2 +- 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/fe/src/com/baidu/palo/load/ExportJob.java b/fe/src/com/baidu/palo/load/ExportJob.java index bd754a080031da..05c1dbc10f6a01 100644 --- a/fe/src/com/baidu/palo/load/ExportJob.java +++ b/fe/src/com/baidu/palo/load/ExportJob.java @@ -89,6 +89,7 @@ public enum JobState { private long id; private long dbId; + private String clusterName; private long tableId; private BrokerDesc brokerDesc; private String exportPath; @@ -286,7 +287,7 @@ private List genCoordinators(List fragments, List fragments, List scanNodes) { + List fragments, List scanNodes, String cluster) { this.isBlockQuery = true; this.queryId = queryId; this.descTable = descTable.toThrift(); @@ -194,6 +196,7 @@ public Coordinator(TUniqueId queryId, DescriptorTable descTable, this.queryGlobals.setNow_string(DATE_FORMAT.format(new Date())); this.tResourceInfo = new TResourceInfo("", ""); this.needReport = true; + this.clusterName = cluster; } public TUniqueId getQueryId() { @@ -251,7 +254,7 @@ private void prepare() { queryProfile.addChild(fragmentProfile.get(i)); } - this.idToBackend = Catalog.getCurrentSystemInfo().getIdToBackend(); + this.idToBackend = Catalog.getCurrentSystemInfo().getClusterIdToBackend(clusterName); if (LOG.isDebugEnabled()) { LOG.debug("idToBackend size={}", idToBackend.size()); for (Map.Entry entry : idToBackend.entrySet()) { diff --git a/fe/src/com/baidu/palo/system/SystemInfoService.java b/fe/src/com/baidu/palo/system/SystemInfoService.java index 583dfc0345c52c..91e9c8bafc10f2 100644 --- a/fe/src/com/baidu/palo/system/SystemInfoService.java +++ b/fe/src/com/baidu/palo/system/SystemInfoService.java @@ -861,7 +861,22 @@ public synchronized List seqChooseBackendIds(int backendNum, boolean needA public ImmutableMap getIdToBackend() { return idToBackendRef.get(); } - + + + public ImmutableMap getClusterIdToBackend(String cluster) { + if (Strings.isNullOrEmpty(cluster)) { + return idToBackendRef.get(); + } + + Map retMaps = Maps.newHashMap(); + for (Backend backend : idToBackendRef.get().values().asList()) { + if (cluster.equals(backend.getOwnerClusterName())) { + retMaps.put(backend.getId(), backend); + } + } + return ImmutableMap.copyOf(retMaps); + } + public long getBackendReportVersion(long backendId) { AtomicLong atomicLong = null; if ((atomicLong = idToReportVersionRef.get().get(backendId)) == null) { diff --git a/fe/src/com/baidu/palo/task/PullLoadTask.java b/fe/src/com/baidu/palo/task/PullLoadTask.java index e018bbb14eb96d..f52f9a3b65e067 100644 --- a/fe/src/com/baidu/palo/task/PullLoadTask.java +++ b/fe/src/com/baidu/palo/task/PullLoadTask.java @@ -202,7 +202,7 @@ public void executeOnce() throws InternalException { UUID uuid = UUID.randomUUID(); executeId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); curCoordinator = new Coordinator(executeId, planner.getDescTable(), - planner.getFragments(), planner.getScanNodes()); + planner.getFragments(), planner.getScanNodes(), db.getClusterName()); curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); }