Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
Expand Down Expand Up @@ -316,23 +315,27 @@ public List<Column> getTableColumns() throws AnalysisException {
TNetworkAddress address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
try {
PFetchTableSchemaRequest request = getFetchTableStructureRequest();
Future<InternalService.PFetchTableSchemaResult> future = BackendServiceProxy.getInstance()
.fetchTableStructureAsync(address, request);

InternalService.PFetchTableSchemaResult result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
String errMsg;
if (code != TStatusCode.OK) {
if (!result.getStatus().getErrorMsgsList().isEmpty()) {
errMsg = result.getStatus().getErrorMsgsList().get(0);
} else {
errMsg = "fetchTableStructureAsync failed. backend address: "
+ NetUtils
.getHostPortInAccessibleFormat(address.getHostname(), address.getPort());
InternalService.PFetchTableSchemaResult result = null;

// `request == null` means we don't need to get schemas from BE,
// and we fill a dummy col for this table.
if (request != null) {
Future<InternalService.PFetchTableSchemaResult> future = BackendServiceProxy.getInstance()
.fetchTableStructureAsync(address, request);

result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
String errMsg;
if (code != TStatusCode.OK) {
if (!result.getStatus().getErrorMsgsList().isEmpty()) {
errMsg = result.getStatus().getErrorMsgsList().get(0);
} else {
errMsg = "fetchTableStructureAsync failed. backend address: "
+ address.getHostname() + ":" + address.getPort();
}
throw new AnalysisException(errMsg);
}
throw new AnalysisException(errMsg);
}

fillColumns(result);
} catch (RpcException e) {
throw new AnalysisException("fetchTableStructureResult rpc exception", e);
Expand Down Expand Up @@ -393,10 +396,12 @@ private Pair<Type, Integer> getColumnType(List<PTypeNode> typeNodes, int start)
return Pair.of(type, parsedNodes);
}

private void fillColumns(InternalService.PFetchTableSchemaResult result)
throws AnalysisException {
if (result.getColumnNums() == 0) {
throw new AnalysisException("The amount of column is 0");
private void fillColumns(InternalService.PFetchTableSchemaResult result) {
// `result == null` means we don't need to get schemas from BE,
// and we fill a dummy col for this table.
if (result == null) {
columns.add(new Column("__dummy_col", ScalarType.createStringType(), true));
return;
}
// add fetched file columns
for (int idx = 0; idx < result.getColumnNums(); ++idx) {
Expand All @@ -412,7 +417,7 @@ private void fillColumns(InternalService.PFetchTableSchemaResult result)
}
}

private PFetchTableSchemaRequest getFetchTableStructureRequest() throws AnalysisException, TException {
private PFetchTableSchemaRequest getFetchTableStructureRequest() throws TException {
// set TFileScanRangeParams
TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams();
fileScanRangeParams.setFormatType(fileFormatType);
Expand All @@ -429,14 +434,19 @@ private PFetchTableSchemaRequest getFetchTableStructureRequest() throws Analysis
// get first file, used to parse table schema
TBrokerFileStatus firstFile = null;
for (TBrokerFileStatus fileStatus : fileStatuses) {
if (fileStatus.isIsDir()) {
if (fileStatus.isIsDir() || fileStatus.size == 0) {
continue;
}
firstFile = fileStatus;
break;
}

// `firstFile == null` means:
// 1. No matching file path exists
// 2. All matched files have a size of 0
// For these two situations, we don't need to get schema from BE
if (firstFile == null) {
throw new AnalysisException("Can not get first file, please check uri.");
return null;
}

// set TFileRangeDesc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select1 --

-- !desc1 --
__dummy_col TEXT Yes false \N NONE

17 changes: 17 additions & 0 deletions regression-test/data/load_p0/tvf/test_tvf_empty_file.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --

-- !desc --
__dummy_col TEXT Yes false \N NONE

-- !select2 --
1 doris 18
2 nereids 20
3 xxx 22
4 yyy 21

-- !des2 --
c1 TEXT Yes false \N NONE
c2 TEXT Yes false \N NONE
c3 TEXT Yes false \N NONE

11 changes: 11 additions & 0 deletions regression-test/data/load_p0/tvf/test_tvf_error_url.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --

-- !desc --
__dummy_col TEXT Yes false \N NONE

-- !select2 --

-- !desc2 --
__dummy_col TEXT Yes false \N NONE

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hdfs_tvf_error_uri","external,hive,tvf,external_docker") {
String hdfs_port = context.config.otherConfigs.get("hdfs_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")

// It's okay to use random `hdfsUser`, but can not be empty.
def hdfsUserName = "doris"
def format = "csv"
def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}"
def uri = ""

String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
// test csv format
uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/no_exist_file.csv"
format = "csv"
order_qt_select1 """ select * from HDFS(
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}"); """

order_qt_desc1 """ desc function HDFS(
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}"); """
}
}
69 changes: 69 additions & 0 deletions regression-test/suites/load_p0/tvf/test_tvf_empty_file.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_tvf_empty_file", "p0") {
String ak = getS3AK()
String sk = getS3SK()
String s3_endpoint = getS3Endpoint()
String region = getS3Region()
String bucket = context.config.otherConfigs.get("s3BucketName");

String path = "regression/datalake"

// ${path}/empty_file_test.csv is an empty file
// so it should return empty sets.
order_qt_select """ SELECT * FROM S3 (
"uri" = "http://${bucket}.${s3_endpoint}/${path}/empty_file_test.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "csv",
"region" = "${region}"
);
"""

order_qt_desc """ desc function S3 (
"uri" = "http://${bucket}.${s3_endpoint}/${path}/empty_file_test.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "csv",
"region" = "${region}"
);
"""

// ${path}/empty_file_test*.csv matches 3 files:
// empty_file_test.csv, empty_file_test_1.csv, empty_file_test_2.csv
// empty_file_test.csv is an empty file, but
// empty_file_test_1.csv and empty_file_test_2.csv have data
// so it should return data of empty_file_test_1.csv and empty_file_test_2.cs
order_qt_select2 """ SELECT * FROM S3 (
"uri" = "http://${bucket}.${s3_endpoint}/${path}/empty_file_test*.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "csv",
"region" = "${region}"
) order by c1;
"""

order_qt_des2 """ desc function S3 (
"uri" = "http://${bucket}.${s3_endpoint}/${path}/empty_file_test*.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "csv",
"region" = "${region}"
);
"""
}
61 changes: 61 additions & 0 deletions regression-test/suites/load_p0/tvf/test_tvf_error_url.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_tvf_error_url", "p0") {
String ak = getS3AK()
String sk = getS3SK()
String s3_endpoint = getS3Endpoint()
String region = getS3Region()
String bucket = context.config.otherConfigs.get("s3BucketName");

String path = "select_tvf/no_exists_file_test"
order_qt_select """ SELECT * FROM S3 (
"uri" = "http://${s3_endpoint}/${bucket}/${path}/no_exist_file1.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "csv",
"region" = "${region}"
);
"""

order_qt_desc """ desc function S3 (
"uri" = "http://${s3_endpoint}/${bucket}/${path}/no_exist_file1.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "csv",
"region" = "${region}"
);
"""

order_qt_select2 """ SELECT * FROM S3 (
"uri" = "http://${s3_endpoint}/${bucket}/${path}/*.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "csv",
"region" = "${region}"
);
"""

order_qt_desc2 """ desc function S3 (
"uri" = "http://${s3_endpoint}/${bucket}/${path}/*.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "csv",
"region" = "${region}"
);
"""
}