Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
572 changes: 572 additions & 0 deletions be/src/http/action/http_load.cpp

Large diffs are not rendered by default.

66 changes: 66 additions & 0 deletions be/src/http/action/http_load.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>

#include "http/http_handler.h"
#include "util/metrics.h"

namespace doris {

class ExecEnv;
class Status;
class StreamLoadContext;

class HttpLoadAction : public HttpHandler {
public:
HttpLoadAction(ExecEnv* exec_env);
~HttpLoadAction() override;

void handle(HttpRequest* req) override;

bool request_will_be_read_progressively() override { return true; }

int on_header(HttpRequest* req) override;

void on_chunk_data(HttpRequest* req) override;
void free_handler_ctx(std::shared_ptr<void> ctx) override;

private:
Status _on_header(HttpRequest* http_req, THttpLoadPutParams& params,
std::shared_ptr<StreamLoadContext> ctx);
Status _handle(THttpLoadPutParams& params, std::shared_ptr<StreamLoadContext> ctx);
Status _data_saved_path(THttpLoadPutParams& params, std::string* file_path);
Status _process_put(THttpLoadPutParams& params, std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
void _parse_format(const std::string& format_str, const std::string& compress_type_str,
TFileFormatType::type* format_type, TFileCompressType::type* compress_type);
bool _is_format_support_streaming(TFileFormatType::type format);

private:
ExecEnv* _exec_env;

std::shared_ptr<MetricEntity> _http_load_entity;
IntCounter* http_load_requests_total;
IntCounter* http_load_duration_ms;
IntGauge* http_load_current_processing;
};

} // namespace doris
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
static const std::string HTTP_TXN_OPERATION_KEY = "txn_operation";
static const std::string HTTP_SQL = "sql";

} // namespace doris
1 change: 1 addition & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class StreamLoadContext {

TStreamLoadPutResult put_result;
TStreamLoadMultiTablePutResult multi_table_put_result;
THttpLoadPutResult http_put_result;

std::vector<TTabletCommitInfo> commit_infos;

Expand Down
3 changes: 3 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "http/action/download_binlog_action.h"
#include "http/action/file_cache_action.h"
#include "http/action/health_action.h"
#include "http/action/http_load.h"
#include "http/action/jeprofile_actions.h"
#include "http/action/meta_action.h"
#include "http/action/metrics_action.h"
Expand Down Expand Up @@ -78,6 +79,8 @@ Status HttpService::start() {
streamload_2pc_action);
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load_2pc",
streamload_2pc_action);
HttpLoadAction* httpload_action = _pool.add(new HttpLoadAction(_env));
_ev_http_server->register_handler(HttpMethod::PUT, "/api/v2/_load", httpload_action);

// register download action
std::vector<std::string> allow_paths;
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,13 +651,10 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
return Status::InvalidArgument(
"start offset of TFileRangeDesc must be zero in get parsered schema");
}
if (_params.file_type == TFileType::FILE_STREAM ||
_params.file_type == TFileType::FILE_BROKER) {
if (_params.file_type == TFileType::FILE_BROKER) {
return Status::InternalError(
"Getting parsered schema from csv file do not support stream load and broker "
"load.");
"Getting parsered schema from csv file do not support broker load.");
}

// csv file without names line and types line.
*read_line = 1;
*is_parse_name = false;
Expand All @@ -675,6 +672,8 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {

_file_description.start_offset = start_offset;
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
reader_options.modification_time =
_range.__isset.modification_time ? _range.modification_time : 0;
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description,
reader_options, &_file_system, &_file_reader));
Expand Down
106 changes: 106 additions & 0 deletions docs/en/docs/sql-manual/sql-functions/table-functions/http.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
---
{
"title": "http",
"language": "en"
}
---

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

## Http

### Name

### Description

The HTTP table function (table-valued-function,tvf) allows users to upload data using SQL

#### syntax
```sql
curl --location-trusted -u user:passwd -H "sql: '${load_sql}'" -T data.csv http://127.0.0.1:8030/api/v2/_load
```

Where ${load_sql} is in the following format
```
insert into db.table select * from
http(
"format" = "CSV",
"column_separator" = ","
...
) [where t1 > 0];
```

**parameter description**

Related parameters for accessing http:
- column_separator

Used to specify the column separator in the load file. The default is `\t`. If it is an invisible character, you need to add `\x` as a prefix and hexadecimal to indicate the separator.

For example, the separator `\x01` of the hive file needs to be specified as `-H "column_separator:\x01"`.

You can use a combination of multiple characters as the column separator.

- line_delimiter

Used to specify the line delimiter in the load file. The default is `\n`.

You can use a combination of multiple characters as the column separator.

- max_filter_ratio

The maximum tolerance rate of the import task is 0 by default, and the range of values is 0-1. When the import error rate exceeds this value, the import fails.

If the user wishes to ignore the wrong row, the import can be successful by setting this parameter greater than 0.

The calculation formula is as follows:

``` (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio ```

``` dpp.abnorm.ALL``` denotes the number of rows whose data quality is not up to standard. Such as type mismatch, column mismatch, length mismatch and so on.

``` dpp.norm.ALL ``` refers to the number of correct data in the import process. The correct amount of data for the import task can be queried by the ``SHOW LOAD` command.

The number of rows in the original file = `dpp.abnorm.ALL + dpp.norm.ALL`

- Partitions

Partitions information for tables to be imported will not be imported if the data to be imported does not belong to the specified Partition. These data will be included in `dpp.abnorm.ALL`.

- format

Specify the import data format, support csv, json, the default is csv


- exec_mem_limit

Memory limit. Default is 2GB. Unit is Bytes

### Examples

上传数据
```shell
curl -v --location-trusted -u root: -H "sql: insert into test.t1(k1,k2) select k1,k2 from http(\"format\" = \"CSV\", \"column_separator\" = \",\")" -T example.csv http://127.0.0.1:8030/api/v2/_load
```


### Keywords

http, table-valued-function, tvf
1 change: 1 addition & 0 deletions docs/sidebars.json
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@
"sql-manual/sql-functions/table-functions/explode-numbers",
"sql-manual/sql-functions/table-functions/s3",
"sql-manual/sql-functions/table-functions/hdfs",
"sql-manual/sql-functions/table-functions/http",
"sql-manual/sql-functions/table-functions/iceberg_meta",
"sql-manual/sql-functions/table-functions/backends",
"sql-manual/sql-functions/table-functions/frontends",
Expand Down
106 changes: 106 additions & 0 deletions docs/zh-CN/docs/sql-manual/sql-functions/table-functions/http.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
---
{
"title": "http",
"language": "zh-CN"
}
---

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

## Http

### Name

### Description

http 表函数(table-valued-function,tvf),可以让用户使用 sql 方式上传数据

#### syntax
```sql
curl --location-trusted -u user:passwd -H "sql: '${load_sql}'" -T data.csv http://127.0.0.1:8030/api/v2/_load
```

其中 ${load_sql} 为以下格式
```
insert into db.table select * from
http(
"format" = "CSV",
"column_separator" = ","
...
) [where t1 > 0];
```

**参数说明**

访问 http 相关参数:
- column_separator

用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。

如 hive 文件的分隔符\x01,需要指定为-H "column_separator:\x01"。

可以使用多个字符的组合作为列分隔符。

- line_delimiter

用于指定导入文件中的换行符,默认为\n。

可以使用做多个字符的组合作为换行符。

- max_filter_ratio

导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。

如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。

计算公式为:

`(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio`

`dpp.abnorm.ALL` 表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。

`dpp.norm.ALL` 指的是导入过程中正确数据的条数。可以通过 `SHOW LOAD` 命令查询导入任务的正确数据量。

原始文件的行数 = `dpp.abnorm.ALL + dpp.norm.ALL`

- Partitions

待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 `dpp.abnorm.ALL`

- format

指定导入数据格式,支持csv、json,默认是csv


- exec_mem_limit

导入内存限制。默认为 2GB,单位为字节。

### Examples

上传数据
```shell
curl -v --location-trusted -u root: -H "sql: insert into test.t1(k1,k2) select k1,k2 from http(\"format\" = \"CSV\", \"column_separator\" = \",\")" -T example.csv http://127.0.0.1:8030/api/v2/_load
```


### Keywords

http, table-valued-function, tvf
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public TableValuedFunctionRef(String funcName, String alias, Map<String, String>
this.funcName = funcName;
this.params = params;
this.tableFunction = TableValuedFunctionIf.getTableFunction(funcName, params);
this.table = tableFunction.getTable();
// skip http tvf
if (!funcName.equals("http")) {
this.table = tableFunction.getTable();
}
if (hasExplicitAlias()) {
return;
}
Expand Down Expand Up @@ -125,4 +128,7 @@ public TableValuedFunctionIf getTableFunction() {
return tableFunction;
}

public Map<String, String> getParams() {
return params;
}
}
Loading