From 68ca4d8cfdcba338ce0baf5428204db43d8acc3b Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 29 Mar 2024 18:44:57 +0800 Subject: [PATCH 1/2] 1 --- .../datasource/tvf/source/TVFScanNode.java | 12 ++++-- .../doris/system/BeSelectionPolicy.java | 1 + .../LocalTableValuedFunction.java | 40 +++++++++++++++++-- 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index dcd248a9782206..96e96d3cf19a26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -73,11 +73,15 @@ protected void initBackendPolicy() throws UserException { if (tableValuedFunction instanceof LocalTableValuedFunction) { // For local tvf, the backend was specified by backendId Long backendId = ((LocalTableValuedFunction) tableValuedFunction).getBackendId(); - Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); - if (backend == null) { - throw new UserException("Backend " + backendId + " does not exist"); + if (backendId != -1) { + // User has specified the backend, only use that backend + // Otherwise, use all backends for shared storage. + Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); + if (backend == null) { + throw new UserException("Backend " + backendId + " does not exist"); + } + preferLocations.add(backend.getHost()); } - preferLocations.add(backend.getHost()); } backendPolicy.init(preferLocations); numNodes = backendPolicy.numBackends(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index dfb1ffeaae03cb..ece95f1b00755e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -36,6 +36,7 @@ */ public class BeSelectionPolicy { private static final Logger LOG = LogManager.getLogger(BeSelectionPolicy.class); + public boolean needScheduleAvailable = false; public boolean needQueryAvailable = false; public boolean needLoadAvailable = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java index 35eca403050b1a..5c79986179b46d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java @@ -30,10 +30,13 @@ import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TNetworkAddress; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -47,13 +50,19 @@ public class LocalTableValuedFunction extends ExternalFileTableValuedFunction { public static final String NAME = "local"; public static final String PROP_FILE_PATH = "file_path"; public static final String PROP_BACKEND_ID = "backend_id"; + public static final String PROP_SHARED_STORAGE = "shared_storage"; private static final ImmutableSet LOCATION_PROPERTIES = new ImmutableSet.Builder() .add(PROP_FILE_PATH) - .add(PROP_BACKEND_ID) .build(); + // This backend is user specified backend for listing files, fetching file schema and executing query. private long backendId; + // This backend if for listing files and fetching file schema. + // If "backendId" is set, "backendIdForRequest" will be set to "backendId", + // otherwise, "backendIdForRequest" will be set to one of the available backends. + private long backendIdForRequest = -1; + private boolean sharedStorage = false; public LocalTableValuedFunction(Map properties) throws AnalysisException { // 1. analyze common properties @@ -66,14 +75,37 @@ public LocalTableValuedFunction(Map properties) throws AnalysisE } } filePath = otherProps.get(PROP_FILE_PATH); - backendId = Long.parseLong(otherProps.get(PROP_BACKEND_ID)); + backendId = Long.parseLong(otherProps.getOrDefault(PROP_BACKEND_ID, "-1")); + sharedStorage = Boolean.parseBoolean(otherProps.getOrDefault(PROP_SHARED_STORAGE, "false")); + + // If not shared storage, backend_id is required + // If is shared storage, backend_id can still be set, so that we can query data on single BE node. + // Otherwise, if shared storage is true, we may use multi BE nodes. + if (backendId == -1 && !sharedStorage) { + throw new AnalysisException("'backend_id' is required when 'shared_storage' is false."); + } else if (backendId != -1 && sharedStorage) { + throw new AnalysisException("'shared_storage' should be false when 'backend_id' is set."); + } // 3. parse file getFileListFromBackend(); } private void getFileListFromBackend() throws AnalysisException { - Backend be = Env.getCurrentSystemInfo().getBackend(backendId); + Backend be = null; + if (backendId != -1) { + be = Env.getCurrentSystemInfo().getBackend(backendId); + backendIdForRequest = backendId; + } else { + Preconditions.checkState(sharedStorage); + List beIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (beIds.isEmpty()) { + throw new AnalysisException("No available backend"); + } + Collections.shuffle(beIds); + be = Env.getCurrentSystemInfo().getBackend(beIds.get(0)); + backendIdForRequest = be.getId(); + } if (be == null) { throw new AnalysisException("backend not found with backend_id = " + backendId); } @@ -125,6 +157,6 @@ public Long getBackendId() { @Override protected Backend getBackend() { - return Env.getCurrentSystemInfo().getBackend(backendId); + return Env.getCurrentSystemInfo().getBackend(backendIdForRequest); } } From 224f6b29f9cd1fd90721969989961a14e2b95ef7 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 29 Mar 2024 22:03:53 +0800 Subject: [PATCH 2/2] 1 --- .../apache/doris/tablefunction/LocalTableValuedFunction.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java index 5c79986179b46d..df1bf6b61b2d91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java @@ -83,8 +83,6 @@ public LocalTableValuedFunction(Map properties) throws AnalysisE // Otherwise, if shared storage is true, we may use multi BE nodes. if (backendId == -1 && !sharedStorage) { throw new AnalysisException("'backend_id' is required when 'shared_storage' is false."); - } else if (backendId != -1 && sharedStorage) { - throw new AnalysisException("'shared_storage' should be false when 'backend_id' is set."); } // 3. parse file