Skip to content

Conversation

@Cai-Yao
Copy link
Contributor

@Cai-Yao Cai-Yao commented Feb 20, 2023

Proposed changes

Issue Number: close #xxx

Problem summary

In stream load, add a 'sql' parameter to the header to replace the 'column_separator', 'line_delimiter', 'where', 'columns' in the previous parameter, which is convenient to use.

curl --location-trusted -u user:passwd [-H "sql: ${load_sql}"...] -T data.file -XPUT http://fe_host:http_port/api/_stream_load_with_sql


# -- load_sql
# insert into db.table (col, ...) select stream_col, ... from stream("property1"="value1");

# stream
# (
#     "column_separator" = ",",
#     "format" = "CSV",
#     ...
# )

Examples:

curl  --location-trusted -u root: -T test.csv  -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from stream("format" = "CSV", "column_separator" = "," ) where age >= 30"  http://127.0.0.1:28030/api/_stream_load_with_sql

Checklist(Required)

  • Does it affect the original behavior
  • Has unit tests been added
  • Has document been added or modified
  • Does it need to update dependencies
  • Is this PR support rollback (If NO, please explain WHY)

Further comments

If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...

@github-actions github-actions bot added area/planner Issues or PRs related to the query planner kind/docs Categorizes issue or PR as related to documentation. kind/test labels Feb 20, 2023
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-tidy made some suggestions

Status _process_put_with_load_sql(HttpRequest* http_req, StreamLoadContext* ctx);
void _save_stream_load_record(StreamLoadContext* ctx, const std::string& str);

private:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers]

Suggested change
private:

be/src/http/action/stream_load_with_sql.h:46: previously declared here

private:
^

streamLoad {
set 'version', '1'
set 'sql', """
insert into ${db}.${tableName3} select c1, c2, year(c14), month(c14), day(c14) from stream("format"="csv")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

增加一下 行分隔符、列分隔符的测试case

ctx->load_type = TLoadType::MANUL_LOAD;
ctx->load_src_type = TLoadSourceType::RAW;

ctx->db = req->param(HTTP_DB_KEY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not use db and table in url, should use db and table in sql to check privileges.

// default csv
ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;

if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line useful?


Status StreamLoadWithSqlAction::_on_header(HttpRequest* http_req, StreamLoadContext* ctx) {
// auth information
if (!parse_basic_auth(*http_req, &ctx->auth)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless. Should use the db and table in sql to check privileges.

int64_t begin_txn_start_time = MonotonicNanos();
// RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx));
ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lost some code:
// check content length
ctx->body_bytes = 0;
size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024;
bool read_json_by_line = false;
if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
read_json_by_line = true;
}
}
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
// json max body size
if ((ctx->format == TFileFormatType::FORMAT_JSON) &&
(ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) {
return Status::InternalError(
"The size of this batch exceed the max size [{}] of json type data "
" data [ {} ]. Split the file, or use 'read_json_by_line'",
json_max_body_bytes, ctx->body_bytes);
}
// csv max body size
else if (ctx->body_bytes > csv_max_body_bytes) {
LOG(WARNING) << "body exceed max size." << ctx->brief();
return Status::InternalError("body exceed max size: {}, data: {}", csv_max_body_bytes,
ctx->body_bytes);
}
} else {
#ifndef BE_TEST
evhttp_connection_set_max_body_size(
evhttp_request_get_connection(http_req->get_evhttp_request()), csv_max_body_bytes);
#endif
}

if (!http_req->header(HTTP_TIMEOUT).empty()) {
    try {
        ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT));
    } catch (const std::invalid_argument& e) {
        return Status::InvalidArgument("Invalid timeout format");
    }
}


// begin transaction
int64_t begin_txn_start_time = MonotonicNanos();
// RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

begin txn not called? how to set txn_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

begin_txn on fe

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_current_processing,
MetricUnit::REQUESTS);

#ifdef BE_TEST
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete the code

int64_t start_read_data_time = MonotonicNanos();
const size_t buffer_max_size = 1 * 1024 * 1024;
size_t buffer_size = 0;
char* buffer = new char[buffer_max_size];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment for buffer variable.

} else {
LOG(WARNING) << "_exec_env->master_info not set backend_id";
}
request.__set_backend_id(10046);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10046???


Status StreamLoadWithSqlAction::_process_put(HttpRequest* http_req, StreamLoadContext* ctx) {
// Now we use stream
ctx->use_streaming = is_format_support_streaming(ctx->format);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not support streaming, any other codes?

// ctx->future.wait_for(std::chrono::seconds(config::max_fragment_start_wait_time_seconds));
// if (!ctx->future.valid()) {
// return Status::TimedOut("data receive timeout");
// }
Copy link
Contributor

@yiguolei yiguolei Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not wait all time. For example, if fe crashed, then fe will never set promise, then the http thread will hang. You should wait for 1 second, then call fe to check the load status using load id, if not find then just let load failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not wait all time. For example, if fe crashed, then fe will never set promise, then the http thread will hang. You should wait for 1 second, then call fe to check the txn status, if not find then just let load failed.

OK, I get.

// RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx));
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time;
}
while (!ctx->is_stream_load_put_success) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this code means?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this code means?

This code is maybe useless, I wanted to make sure that the put_process is executed before the handle.

}
}

static bool is_format_support_streaming(TFileFormatType::type format) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not copy parse_format and parse_format, too many duplicate code is hard to maintain. could use StreamLoadAction::is_format_support_streaming or StreamLoadAction::parse_format?


public static BlobStorage create(String name, StorageBackend.StorageType type, Map<String, String> properties) {
if (type == StorageBackend.StorageType.S3) {
if (type == StorageBackend.StorageType.S3 || type == StorageBackend.StorageType.STREAM) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add code here?

for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) {
long streamLoadBackendId = ctx.getBackendId();
if (be.getId() == streamLoadBackendId) {
LOG.info("cwk newLocations");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad log info.

}
try {
info.getCoord().updateFragmentExecStatus(params);
info.getCoord().setIsReportExecStatus(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this code?

why add this code?

The purpose is to determine whether FE has updated the status of BE execution so that streamload_action can query the execution status

}

private void streamLoadPutWithSqlImpl(TStreamLoadPutRequest request) throws UserException {
String loadSql = request.getLoadSql();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log here to indicate that we receive a load request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log here to indicate that we receive a load request.

OK

SqlScanner input = new SqlScanner(new StringReader(loadSql), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {
StatementBase parsedStmt = SqlParserUtils.getFirstStmt(parser);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not call coord.exec() in rpc thread context. It will use up all rpc threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add the coord in to a map<load id, coord> and use a thread pool to check coord status and send status to related be.

TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request)

TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request)
TStreamLoadWithLoadStatusResult StreamLoadWithLoadStatus(1: TStreamLoadWithLoadStatusRequest request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamLoadWithLoadStatus --> streamLoadWithLoadStatus

DCHECK(req.status.ok() || req.done); // if !status.ok() => done
Status exec_status = req.update_fn(req.status);

if (_exec_env->new_load_stream_mgr()->have_promise(req.query_id)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not depend on this to set promise. coordinator_callback only means the fragment is finished on be, but there are some state on fe.
Add a rpc method in be, and if fe find the coord finished, then fe call this rpc service to indicate the load finished.

// Once this is set to true, errors from remote fragments are ignored.
private boolean returnedAllResults;

private boolean isReportExecStatus;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this variable?

}
request.__set_execMemLimit(2 * 1024 * 1024 * 1024L);
request.fileType = TFileType::FILE_STREAM;
request.__set_thrift_rpc_timeout_ms(20000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any config for thrift rpc timeout?

} else {
LOG(WARNING) << "_exec_env->master_info not set backend_id";
}
request.__set_execMemLimit(2 * 1024 * 1024 * 1024L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a config in config.h for this variable.

// time interval to clean expired stream load records
CONF_mInt64(clean_stream_load_record_interval_secs, "1800");
// use memory in stream load default
CONF_Int64(stream_load_exec_mem_limit, "214748364"); // 2G
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONF_mInt64(stream_load_exec_mem_limit, "214748364");
Maybe it's better to adjust this parameter dynamically?

result.setStatus(status);
try {
result.setParams(streamLoadPutImpl(request));
if (request.getVersion() == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use enum type to indicate stream load type.

result.setStatus(status);
try {
result.setParams(streamLoadPutImpl(request));
if (request.getVersion() == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the version variable may not be set for non-sql-load. Should check if it is set.

return result;
}

public class ReportStreamLoadWorker implements Runnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a seperate class file in load package, not in frontendservice.

ctx.setBackendId(request.getBackendId());
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
ctx.setStreamLoadInfo(streamLoadTask);
ctx.setLoadId(request.getLoadId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on line 1277, you alread set queryid to load id. so that load id is useless now.

QeProcessorImpl.INSTANCE.registerQuery(request.getLoadId(), coord);
coord.exec();
} catch (UserException e) {
LOG.warn("exec sql error {}", e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG.warn("exec sql error {}", e);

coord.setQueryType(TQueryType.LOAD);
QeProcessorImpl.INSTANCE.registerQuery(request.getLoadId(), coord);
coord.exec();
} catch (UserException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is useless, since you already catch throwable at line 1304

@github-actions
Copy link
Contributor

clang-tidy review says "All clean, LGTM! 👍"

@github-actions
Copy link
Contributor

github-actions bot commented Aug 1, 2023

clang-tidy review says "All clean, LGTM! 👍"

yiguolei pushed a commit that referenced this pull request Aug 8, 2023
This PR was originally #16940 , but it has not been updated for a long time due to the original author @Cai-Yao . At present, we will merge some of the code into the master first.

thanks @Cai-Yao @yiguolei
@dataroaring dataroaring closed this Sep 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/planner Issues or PRs related to the query planner area/vectorization kind/docs Categorizes issue or PR as related to documentation. kind/test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants