Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
419 changes: 419 additions & 0 deletions be/src/http/action/stream_load_with_sql.cpp

Large diffs are not rendered by default.

65 changes: 65 additions & 0 deletions be/src/http/action/stream_load_with_sql.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <functional>

#include "gen_cpp/PlanNodes_types.h"
#include "http/http_handler.h"
#include "runtime/client_cache.h"
#include "runtime/message_body_sink.h"

namespace doris {

class ExecEnv;
class Status;
class StreamLoadContext;

class StreamLoadWithSqlAction : public HttpHandler {
public:
StreamLoadWithSqlAction(ExecEnv* exec_env);
~StreamLoadWithSqlAction() override;

void handle(HttpRequest* req) override;

bool request_will_be_read_progressively() override { return true; }

int on_header(HttpRequest* req) override;

void on_chunk_data(HttpRequest* req) override;
void free_handler_ctx(std::shared_ptr<void> ctx) override;

private:
Status _on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
Status _handle(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx);
Status _data_saved_path(HttpRequest* req, std::string* file_path);
Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
void _parse_format(const std::string& format_str, const std::string& compress_type_str,
TFileFormatType::type* format_type, TFileCompressType::type* compress_type);
bool _is_format_support_streaming(TFileFormatType::type format);

private:
ExecEnv* _exec_env;
std::shared_ptr<MetricEntity> _stream_load_with_sql_entity;
IntCounter* streaming_load_with_sql_requests_total;
IntCounter* streaming_load_with_sql_duration_ms;
IntGauge* streaming_load_with_sql_current_processing;
};

} // namespace doris
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ static const std::string HTTP_SKIP_LINES = "skip_lines";
static const std::string HTTP_COMMENT = "comment";
static const std::string HTTP_ENABLE_PROFILE = "enable_profile";
static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns";
static const std::string HTTP_SQL = "sql";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
static const std::string HTTP_TXN_OPERATION_KEY = "txn_operation";
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>

#include <mutex>
#include <utility>

#include "common/config.h"
Expand Down Expand Up @@ -149,7 +150,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
if (!stream_load_ctx) {
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}

*file_reader = stream_load_ctx->pipe;

if (file_reader->get() == nullptr) {
Expand Down
7 changes: 6 additions & 1 deletion be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include <utility>
#include <vector>

#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "common/utils.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -157,6 +159,10 @@ class StreamLoadContext {

int64_t txn_id = -1;

// TODO delete code
// for local file
// std::string path;

std::string txn_operation = "";

bool need_rollback = false;
Expand All @@ -168,7 +174,6 @@ class StreamLoadContext {

std::shared_ptr<MessageBodySink> body_sink;
std::shared_ptr<io::StreamLoadPipe> pipe;

TStreamLoadPutResult put_result;
TStreamLoadMultiTablePutResult multi_table_put_result;

Expand Down
7 changes: 7 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "http/action/snapshot_action.h"
#include "http/action/stream_load.h"
#include "http/action/stream_load_2pc.h"
#include "http/action/stream_load_with_sql.h"
#include "http/action/tablet_migration_action.h"
#include "http/action/tablets_distribution_action.h"
#include "http/action/tablets_info_action.h"
Expand Down Expand Up @@ -79,6 +80,12 @@ Status HttpService::start() {
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load_2pc",
streamload_2pc_action);

// register stream load with sql
StreamLoadWithSqlAction* streamload_with_sql_action =
_pool.add(new StreamLoadWithSqlAction(_env));
_ev_http_server->register_handler(HttpMethod::PUT, "/api/_stream_load_with_sql",
streamload_with_sql_action);

// register download action
std::vector<std::string> allow_paths;
for (auto& path : _env->store_paths()) {
Expand Down
28 changes: 28 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@
#include <vector>

#include "common/config.h"
#include "common/consts.h"
#include "common/exception.h"
#include "common/logging.h"
#include "common/signal_handler.h"
#include "common/status.h"
#include "gen_cpp/BackendService.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "gutil/integral_types.h"
#include "http/http_client.h"
#include "io/fs/stream_load_pipe.h"
Expand Down Expand Up @@ -602,6 +606,14 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
st.to_protobuf(result->mutable_status());
return;
}
if (params.file_type == TFileType::FILE_STREAM) {
auto stream_load_ctx =
ExecEnv::GetInstance()->new_load_stream_mgr()->get(params.load_id);
if (!stream_load_ctx) {
st = Status::InternalError("unknown stream load id: {}",
UniqueId(params.load_id).to_string());
}
}
result->set_column_nums(col_names.size());
for (size_t idx = 0; idx < col_names.size(); ++idx) {
result->add_column_names(col_names[idx]);
Expand Down Expand Up @@ -707,6 +719,22 @@ void PInternalServiceImpl::_get_column_ids_by_tablet_ids(
response->mutable_status()->set_status_code(TStatusCode::OK);
}

void PInternalServiceImpl::report_stream_load_status(google::protobuf::RpcController* controller,
const PReportStreamLoadStatusRequest* request,
PReportStreamLoadStatusResponse* response,
google::protobuf::Closure* done) {
TUniqueId load_id;
load_id.__set_hi(request->load_id().hi());
load_id.__set_lo(request->load_id().lo());
Status st = Status::OK();
auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
if (!stream_load_ctx) {
st = Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}
stream_load_ctx->promise.set_value(st);
st.to_protobuf(response->mutable_status());
}

void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
const PProxyRequest* request, PProxyResult* response,
google::protobuf::Closure* done) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ class PInternalServiceImpl : public PBackendService {
PGetTabletVersionsResponse* response,
google::protobuf::Closure* done) override;

void report_stream_load_status(google::protobuf::RpcController* controller,
const PReportStreamLoadStatusRequest* request,
PReportStreamLoadStatusResponse* response,
google::protobuf::Closure* done) override;

private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller,
const PExecPlanFragmentRequest* request,
Expand Down
12 changes: 8 additions & 4 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
return Status::InvalidArgument(
"start offset of TFileRangeDesc must be zero in get parsered schema");
}
if (_params.file_type == TFileType::FILE_STREAM ||
_params.file_type == TFileType::FILE_BROKER) {
if (_params.file_type == TFileType::FILE_BROKER) {
return Status::InternalError(
"Getting parsered schema from csv file do not support stream load and broker "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we support stream load, we should also update the err msg.

"load.");
Expand All @@ -676,8 +675,13 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
_file_description.start_offset = start_offset;
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description,
reader_options, &_file_system, &_file_reader));
if (_params.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_params.load_id, &_file_reader, _state));
} else {
RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description,
reader_options, &_file_system,
&_file_reader));
}
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
_params.file_type != TFileType::FILE_BROKER) {
return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path);
Expand Down
28 changes: 28 additions & 0 deletions docs/en/docs/data-operate/import/import-way/stream-load-manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,34 @@ The number of rows in the original file = `dpp.abnorm.ALL + dpp.norm.ALL`
}
```

### Use stream load with SQL

You can add a `sql` parameter to the `Header` to replace the `column_separator`, `line_delimiter`, `where`, `columns` in the previous parameter, which is convenient to use.

```
curl --location-trusted -u user:passwd
[-H "sql: ${load_sql}"...]
-T data.file
-XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load_with_sql


# -- load_sql
# insert into db.table (col, ...) select stream_col, ... from stream("property1"="value1");

# stream
# (
# "column_separator" = ",",
# "format" = "CSV",
# ...
# )
```

Examples:

```
curl --location-trusted -u root: -T test.csv -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from stream("format" = "CSV", "column_separator" = "," ) where age >= 30" http://127.0.0.1:28030/api/demo/example_tbl_1/_stream_load_with_sql
```

### Return results

Since Stream load is a synchronous import method, the result of the import is directly returned to the user by creating the return value of the import.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,31 @@ Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的
}
```

### 使用SQL表达Stream Load的参数

可以在Header中添加一个`sql`的参数,去替代之前参数中的`column_separator`、`line_delimiter`、`where`、`columns`参数,方便使用。

```
curl --location-trusted -u user:passwd [-H "sql: ${load_sql}"...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load_with_sql


# -- load_sql
# insert into db.table (col, ...) select stream_col, ... from stream("property1"="value1");

# stream
# (
# "column_separator" = ",",
# "format" = "CSV",
# ...
# )
```

示例:

```
curl --location-trusted -u root: -T test.csv -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from stream("format" = "CSV", "column_separator" = "," ) where age >= 30" http://127.0.0.1:28030/api/demo/example_tbl_1/_stream_load_with_sql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

转义是怎么处理的?

```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是不是再加一个直接请求 be 的 example? since they have different URI pattern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的文档应该是错的,我们先把代码合入,然后慢慢改吧。我怕得反复的rebase



### 返回结果

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ public Object streamLoad(HttpServletRequest request,
return executeWithoutPassword(request, response, db, table, true);
}

@RequestMapping(path = "/api/_stream_load_with_sql",
method = RequestMethod.PUT)
public Object streamLoadWithSql(HttpServletRequest request,
HttpServletResponse response) {
String sql = request.getHeader("sql");
LOG.info("streaming load sql={}", sql);
executeCheckPassword(request, response);
try {
// A 'Load' request must have 100-continue header
if (request.getHeader(HttpHeaderNames.EXPECT.toString()) == null) {
return new RestBaseResult("There is no 100-continue header");
}

final String clusterName = ConnectContext.get().getClusterName();
if (Strings.isNullOrEmpty(clusterName)) {
return new RestBaseResult("No cluster selected.");
}

String label = request.getHeader(LABEL_KEY);
TNetworkAddress redirectAddr;
redirectAddr = selectRedirectBackend(clusterName);

LOG.info("redirect load action to destination={}, label: {}",
redirectAddr.toString(), label);

RedirectView redirectView = redirectTo(request, redirectAddr);
return redirectView;
} catch (Exception e) {
return new RestBaseResult(e.getMessage());
}
}

@RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
public Object streamLoad2PC(HttpServletRequest request,
HttpServletResponse response,
Expand Down
30 changes: 30 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.resource.Tag;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionStatus;
Expand Down Expand Up @@ -75,6 +76,11 @@ public class ConnectContext {
protected volatile long stmtId;
protected volatile long forwardedStmtId;

// set for stream load with sql
protected volatile TUniqueId loadId;
protected volatile long backendId;
protected volatile LoadTaskInfo streamLoadInfo;

protected volatile TUniqueId queryId;
protected volatile String traceId;
// id for this connection
Expand Down Expand Up @@ -320,6 +326,30 @@ public long getStmtId() {
return stmtId;
}

public long getBackendId() {
return backendId;
}

public void setBackendId(long backendId) {
this.backendId = backendId;
}

public TUniqueId getLoadId() {
return loadId;
}

public void setLoadId(TUniqueId loadId) {
this.loadId = loadId;
}

public void setStreamLoadInfo(LoadTaskInfo streamLoadInfo) {
this.streamLoadInfo = streamLoadInfo;
}

public LoadTaskInfo getStreamLoadInfo() {
return streamLoadInfo;
}

public void setStmtId(long stmtId) {
this.stmtId = stmtId;
}
Expand Down
Loading