Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,15 @@ protected void initBackendPolicy() throws UserException {
if (tableValuedFunction instanceof LocalTableValuedFunction) {
// For local tvf, the backend was specified by backendId
Long backendId = ((LocalTableValuedFunction) tableValuedFunction).getBackendId();
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null) {
throw new UserException("Backend " + backendId + " does not exist");
if (backendId != -1) {
// User has specified the backend, only use that backend
// Otherwise, use all backends for shared storage.
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null) {
throw new UserException("Backend " + backendId + " does not exist");
}
preferLocations.add(backend.getHost());
}
preferLocations.add(backend.getHost());
}
backendPolicy.init(preferLocations);
numNodes = backendPolicy.numBackends();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
*/
public class BeSelectionPolicy {
private static final Logger LOG = LogManager.getLogger(BeSelectionPolicy.class);

public boolean needScheduleAvailable = false;
public boolean needQueryAvailable = false;
public boolean needLoadAvailable = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -47,13 +50,19 @@ public class LocalTableValuedFunction extends ExternalFileTableValuedFunction {
public static final String NAME = "local";
public static final String PROP_FILE_PATH = "file_path";
public static final String PROP_BACKEND_ID = "backend_id";
public static final String PROP_SHARED_STORAGE = "shared_storage";

private static final ImmutableSet<String> LOCATION_PROPERTIES = new ImmutableSet.Builder<String>()
.add(PROP_FILE_PATH)
.add(PROP_BACKEND_ID)
.build();

// This backend is user specified backend for listing files, fetching file schema and executing query.
private long backendId;
// This backend if for listing files and fetching file schema.
// If "backendId" is set, "backendIdForRequest" will be set to "backendId",
// otherwise, "backendIdForRequest" will be set to one of the available backends.
private long backendIdForRequest = -1;
private boolean sharedStorage = false;

public LocalTableValuedFunction(Map<String, String> properties) throws AnalysisException {
// 1. analyze common properties
Expand All @@ -66,14 +75,35 @@ public LocalTableValuedFunction(Map<String, String> properties) throws AnalysisE
}
}
filePath = otherProps.get(PROP_FILE_PATH);
backendId = Long.parseLong(otherProps.get(PROP_BACKEND_ID));
backendId = Long.parseLong(otherProps.getOrDefault(PROP_BACKEND_ID, "-1"));
sharedStorage = Boolean.parseBoolean(otherProps.getOrDefault(PROP_SHARED_STORAGE, "false"));

// If not shared storage, backend_id is required
// If is shared storage, backend_id can still be set, so that we can query data on single BE node.
// Otherwise, if shared storage is true, we may use multi BE nodes.
if (backendId == -1 && !sharedStorage) {
throw new AnalysisException("'backend_id' is required when 'shared_storage' is false.");
}

// 3. parse file
getFileListFromBackend();
}

private void getFileListFromBackend() throws AnalysisException {
Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
Backend be = null;
if (backendId != -1) {
be = Env.getCurrentSystemInfo().getBackend(backendId);
backendIdForRequest = backendId;
} else {
Preconditions.checkState(sharedStorage);
List<Long> beIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (beIds.isEmpty()) {
throw new AnalysisException("No available backend");
}
Collections.shuffle(beIds);
be = Env.getCurrentSystemInfo().getBackend(beIds.get(0));
backendIdForRequest = be.getId();
}
if (be == null) {
throw new AnalysisException("backend not found with backend_id = " + backendId);
}
Expand Down Expand Up @@ -125,6 +155,6 @@ public Long getBackendId() {

@Override
protected Backend getBackend() {
return Env.getCurrentSystemInfo().getBackend(backendId);
return Env.getCurrentSystemInfo().getBackend(backendIdForRequest);
}
}