From 52254461d8513baaef780306e08e11f7465644c8 Mon Sep 17 00:00:00 2001 From: wangbo Date: Thu, 16 Nov 2023 19:55:50 +0800 Subject: [PATCH] select coordinator node from user's tag when exec streaming load (#27106) --- .../java/org/apache/doris/httpv2/rest/LoadAction.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index b5ca33b0583130..09988aad1baff9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -26,6 +26,7 @@ import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.resource.Tag; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; @@ -44,6 +45,7 @@ import org.springframework.web.servlet.view.RedirectView; import java.util.List; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -196,7 +198,12 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) { } private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException { - BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(clusterName).needLoadAvailable().build(); + String qualifiedUser = ConnectContext.get().getQualifiedUser(); + Set userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); + BeSelectionPolicy policy = new BeSelectionPolicy.Builder() + .setCluster(clusterName) + .addTags(userTags) + .needLoadAvailable().build(); List backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); if (backendIds.isEmpty()) { throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);