diff --git a/be/src/http/action/http_load.cpp b/be/src/http/action/http_load.cpp new file mode 100644 index 00000000000000..d89a727695d2da --- /dev/null +++ b/be/src/http/action/http_load.cpp @@ -0,0 +1,572 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/http_load.h" + +// use string iequal +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "common/consts.h" +#include "common/logging.h" +#include "common/status.h" +#include "common/utils.h" +#include "http/http_channel.h" +#include "http/http_common.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/utils.h" +#include "io/fs/stream_load_pipe.h" +#include "olap/storage_engine.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/load_path_mgr.h" +#include "runtime/message_body_sink.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" +#include "runtime/stream_load/stream_load_executor.h" +#include "runtime/stream_load/stream_load_recorder.h" +#include "util/byte_buffer.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" +#include "util/string_util.h" +#include "util/thrift_rpc_helper.h" +#include "util/time.h" +#include "util/uid_util.h" +#include "util/url_coding.h" + +namespace doris { +using namespace ErrorCode; + +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_load_requests_total, MetricUnit::REQUESTS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_load_duration_ms, MetricUnit::MILLISECONDS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(http_load_current_processing, MetricUnit::REQUESTS); + +void HttpLoadAction::_parse_format(const std::string& format_str, + const std::string& compress_type_str, + TFileFormatType::type* format_type, + TFileCompressType::type* compress_type) { + if (format_str.empty()) { + _parse_format("CSV", compress_type_str, format_type, compress_type); + return; + } + *compress_type = TFileCompressType::PLAIN; + *format_type = TFileFormatType::FORMAT_UNKNOWN; + if (iequal(format_str, "CSV")) { + if (compress_type_str.empty()) { + *format_type = TFileFormatType::FORMAT_CSV_PLAIN; + } else if (iequal(compress_type_str, "GZ")) { + *format_type = TFileFormatType::FORMAT_CSV_GZ; + *compress_type = TFileCompressType::GZ; + } else if (iequal(compress_type_str, "LZO")) { + *format_type = TFileFormatType::FORMAT_CSV_LZO; + *compress_type = TFileCompressType::LZO; + } else if (iequal(compress_type_str, "BZ2")) { + *format_type = TFileFormatType::FORMAT_CSV_BZ2; + *compress_type = TFileCompressType::BZ2; + } else if (iequal(compress_type_str, "LZ4")) { + *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; + *compress_type = TFileCompressType::LZ4FRAME; + } else if (iequal(compress_type_str, "LZOP")) { + *format_type = TFileFormatType::FORMAT_CSV_LZOP; + *compress_type = TFileCompressType::LZO; + } else if (iequal(compress_type_str, "DEFLATE")) { + *format_type = TFileFormatType::FORMAT_CSV_DEFLATE; + *compress_type = TFileCompressType::DEFLATE; + } + } else if (iequal(format_str, "JSON")) { + if (compress_type_str.empty()) { + *format_type = TFileFormatType::FORMAT_JSON; + } + } else if (iequal(format_str, "PARQUET")) { + *format_type = TFileFormatType::FORMAT_PARQUET; + } else if (iequal(format_str, "ORC")) { + *format_type = TFileFormatType::FORMAT_ORC; + } +} + +bool HttpLoadAction::_is_format_support_streaming(TFileFormatType::type format) { + switch (format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZO: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_JSON: + return true; + default: + return false; + } +} + +HttpLoadAction::HttpLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) { + _http_load_entity = DorisMetrics::instance()->metric_registry()->register_entity("http_load"); + INT_COUNTER_METRIC_REGISTER(_http_load_entity, http_load_requests_total); + INT_COUNTER_METRIC_REGISTER(_http_load_entity, http_load_duration_ms); + INT_GAUGE_METRIC_REGISTER(_http_load_entity, http_load_current_processing); +} + +HttpLoadAction::~HttpLoadAction() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_http_load_entity); +} + +void HttpLoadAction::handle(HttpRequest* req) { + std::shared_ptr ctx = + std::static_pointer_cast(req->handler_ctx()); + if (ctx == nullptr) { + return; + } + + THttpLoadPutParams params; + // status already set to fail + if (ctx->status.ok()) { + ctx->status = _handle(params, ctx); + if (!ctx->status.ok() && !ctx->status.is()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; + } + } + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; + + if (!ctx->status.ok() && !ctx->status.is()) { + if (ctx->need_rollback) { + _exec_env->stream_load_executor()->rollback_txn(ctx.get()); + ctx->need_rollback = false; + } + if (ctx->body_sink.get() != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + } + + auto str = ctx->to_json(); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); +#ifndef BE_TEST + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } +#endif +} + +Status HttpLoadAction::_handle(THttpLoadPutParams& params, std::shared_ptr ctx) { + if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { + LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes + << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; + return Status::InternalError("receive body don't equal with body bytes"); + } + if (!ctx->use_streaming) { + // if we use non-streaming, we need to close file first, + // then execute_plan_fragment here + // this will close file + ctx->body_sink.reset(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx)); + } else { + RETURN_IF_ERROR(ctx->body_sink->finish()); + } + // wait stream load finish + RETURN_IF_ERROR(ctx->future.get()); + + if (ctx->two_phase_commit) { + int64_t pre_commit_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); + ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; + } else { + // If put file success we need commit this load + int64_t commit_and_publish_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); + ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; + } + return Status::OK(); +} + +int HttpLoadAction::on_header(HttpRequest* req) { + http_load_current_processing->increment(1); + + std::shared_ptr ctx = std::make_shared(_exec_env); + req->set_handler_ctx(ctx); + + // parse sql through rpc request fe + THttpLoadPutRequest request; + THttpLoadPutParams params; + request.__set_loadId(ctx->id.to_thrift()); + request.__set_load_sql(req->header(HTTP_SQL)); + + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, ¶ms](FrontendServiceConnection& client) { + client->httpLoadPut(params, request); + }); + + ctx->load_type = TLoadType::MANUL_LOAD; + ctx->load_src_type = TLoadSourceType::RAW; + + url_decode(params.db, &ctx->db); + url_decode(params.table, &ctx->table); + ctx->label = params.label; + if (ctx->label.empty()) { + ctx->label = generate_uuid_string(); + } + + ctx->two_phase_commit = params.two_phase_commit; + + LOG(INFO) << "new income http load request." << ctx->brief() + << " sql : " << req->header(HTTP_SQL) << ", db=" << ctx->db << ", tbl=" << ctx->table; + + auto st = _on_header(req, params, ctx); + if (!st.ok()) { + ctx->status = std::move(st); + if (ctx->need_rollback) { + _exec_env->stream_load_executor()->rollback_txn(ctx.get()); + ctx->need_rollback = false; + } + if (ctx->body_sink.get() != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + auto str = ctx->to_json(); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + http_load_current_processing->increment(-1); +#ifndef BE_TEST + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } +#endif + return -1; + } + return 0; +} + +Status HttpLoadAction::_on_header(HttpRequest* http_req, THttpLoadPutParams& params, + std::shared_ptr ctx) { + // auth information + if (!parse_basic_auth(*http_req, &ctx->auth)) { + LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); + return Status::InternalError("no valid Basic authorization"); + } + + std::string format_str = params.format; + if (iequal(format_str, BeConsts::CSV_WITH_NAMES) || + iequal(format_str, BeConsts::CSV_WITH_NAMES_AND_TYPES)) { + ctx->header_type = format_str; + //treat as CSV + format_str = BeConsts::CSV; + } + _parse_format(format_str, params.compress_type, &ctx->format, &ctx->compress_type); + if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { + return Status::InternalError("unknown data format, format={}", params.format); + } + + // check content length + ctx->body_bytes = 0; + size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; + size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024; + bool read_json_by_line = params.read_json_by_line; + if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { + ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + // json max body size + if ((ctx->format == TFileFormatType::FORMAT_JSON) && + (ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) { + return Status::InternalError( + "The size of this batch exceed the max size [{}] of json type data " + " data [ {} ]. Split the file, or use 'read_json_by_line'", + json_max_body_bytes, ctx->body_bytes); + } + // csv max body size + else if (ctx->body_bytes > csv_max_body_bytes) { + LOG(WARNING) << "body exceed max size." << ctx->brief(); + return Status::InternalError("body exceed max size: {}, data: {}", csv_max_body_bytes, + ctx->body_bytes); + } + } else { +#ifndef BE_TEST + evhttp_connection_set_max_body_size( + evhttp_request_get_connection(http_req->get_evhttp_request()), csv_max_body_bytes); +#endif + } + + if (!params.timeout.empty()) { + try { + ctx->timeout_second = std::stoi(params.timeout); + } catch (const std::invalid_argument& e) { + return Status::InvalidArgument("Invalid timeout format, {}", e.what()); + } + } + if (!params.comment.empty()) { + ctx->load_comment = params.comment; + } + // begin transaction + int64_t begin_txn_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get())); + ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time; + + // process put file + return _process_put(params, ctx); +} + +void HttpLoadAction::on_chunk_data(HttpRequest* req) { + std::shared_ptr ctx = + std::static_pointer_cast(req->handler_ctx()); + if (ctx == nullptr || !ctx->status.ok()) { + return; + } + + struct evhttp_request* ev_req = req->get_evhttp_request(); + auto evbuf = evhttp_request_get_input_buffer(ev_req); + + int64_t start_read_data_time = MonotonicNanos(); + while (evbuffer_get_length(evbuf) > 0) { + auto bb = ByteBuffer::allocate(128 * 1024); + auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); + bb->pos = remove_bytes; + bb->flip(); + auto st = ctx->body_sink->append(bb); + if (!st.ok()) { + LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); + ctx->status = st; + return; + } + ctx->receive_bytes += remove_bytes; + } + ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time); +} + +void HttpLoadAction::free_handler_ctx(std::shared_ptr param) { + std::shared_ptr ctx = std::static_pointer_cast(param); + if (ctx == nullptr) { + return; + } + // sender is gone, make receiver know it + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel("sender is gone"); + } + // remove stream load context from stream load manager and the resource will be released + ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); +} + +Status HttpLoadAction::_process_put(THttpLoadPutParams& params, + std::shared_ptr ctx) { + // Now we use stream + ctx->use_streaming = _is_format_support_streaming(ctx->format); + + // put request + TStreamLoadPutRequest request; + set_request_auth(&request, ctx->auth); + request.db = ctx->db; + request.tbl = ctx->table; + request.txnId = ctx->txn_id; + request.formatType = ctx->format; + request.__set_compress_type(ctx->compress_type); + request.__set_header_type(ctx->header_type); + request.__set_loadId(ctx->id.to_thrift()); + if (ctx->use_streaming) { + auto pipe = std::make_shared( + io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + ctx->body_bytes /* total_length */); + request.fileType = TFileType::FILE_STREAM; + ctx->body_sink = pipe; + ctx->pipe = pipe; + RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx)); + } else { + RETURN_IF_ERROR(_data_saved_path(params, &request.path)); + auto file_sink = std::make_shared(request.path); + RETURN_IF_ERROR(file_sink->open()); + request.__isset.path = true; + request.fileType = TFileType::FILE_LOCAL; + request.__set_file_size(ctx->body_bytes); + ctx->body_sink = file_sink; + } + // params setting + if (!params.columns.empty()) { + request.__set_columns(params.columns); + } + if (!params.where.empty()) { + request.__set_where(params.where); + } + if (!params.column_separator.empty()) { + request.__set_columnSeparator(params.column_separator); + } + if (!params.line_delimiter.empty()) { + request.__set_line_delimiter(params.line_delimiter); + } + if (!params.partitions.empty()) { + request.__set_partitions(params.partitions); + request.__set_isTempPartition(false); + if (!params.temporary_partitions.empty()) { + return Status::InvalidArgument( + "Can not specify both partitions and temporary partitions"); + } + } + if (!params.temporary_partitions.empty()) { + request.__set_partitions(params.temporary_partitions); + request.__set_isTempPartition(true); + if (!params.partitions.empty()) { + return Status::InvalidArgument( + "Can not specify both partitions and temporary partitions"); + } + } + + request.__set_negative(params.negative); + request.__set_strictMode(params.strict_mode); + + if (!params.timeout.empty()) { + request.__set_timezone(params.timeout); + } + + if (!params.exec_mem_limit.empty()) { + try { + request.__set_execMemLimit(std::stoll(params.exec_mem_limit)); + } catch (const std::invalid_argument& e) { + return Status::InvalidArgument("Invalid mem limit format, {}", e.what()); + } + } + if (!params.jsonpaths.empty()) { + request.__set_jsonpaths(params.jsonpaths); + } + if (!params.json_root.empty()) { + request.__set_json_root(params.json_root); + } + request.__set_strip_outer_array(params.strip_outer_array); + request.__set_num_as_string(params.num_as_string); + request.__set_fuzzy_parse(params.fuzzy_parse); + request.__set_read_json_by_line(params.read_json_by_line); + // TODO function_column.sequence_col + + request.__set_send_batch_parallelism(params.send_batch_parallelism); + request.__set_load_to_single_tablet(params.load_to_single_tablet); + + if (ctx->timeout_second != -1) { + request.__set_timeout(ctx->timeout_second); + } + request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); + TMergeType::type merge_type = TMergeType::APPEND; + StringCaseMap merge_type_map = {{"APPEND", TMergeType::APPEND}, + {"DELETE", TMergeType::DELETE}, + {"MERGE", TMergeType::MERGE}}; + if (!params.merge_type.empty()) { + std::string merge_type_str = params.merge_type; + if (merge_type_map.find(merge_type_str) != merge_type_map.end()) { + merge_type = merge_type_map.find(merge_type_str)->second; + } else { + return Status::InvalidArgument("Invalid merge type {}", merge_type_str); + } + if (merge_type == TMergeType::MERGE && params.delete_condition.empty()) { + return Status::InvalidArgument("Excepted DELETE ON clause when merge type is MERGE."); + } else if (merge_type != TMergeType::MERGE && !params.delete_condition.empty()) { + return Status::InvalidArgument( + "Not support DELETE ON clause when merge type is not MERGE."); + } + } + request.__set_merge_type(merge_type); + if (!params.delete_condition.empty()) { + request.__set_delete_condition(params.delete_condition); + } + ctx->max_filter_ratio = params.max_filter_ratio; + request.__set_max_filter_ratio(ctx->max_filter_ratio); + if (!params.hidden_columns.empty()) { + request.__set_hidden_columns(params.hidden_columns); + } + request.__set_trim_double_quotes(params.trim_double_quotes); + request.__set_skip_lines(params.skip_lines); + request.__set_enable_profile(params.enable_profile); + request.__set_partial_update(params.partial_columns); + +#ifndef BE_TEST + // plan this load + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + int64_t stream_load_put_start_time = MonotonicNanos(); + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, ctx](FrontendServiceConnection& client) { + client->streamLoadPut(ctx->put_result, request); + })); + ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; +#else + +#endif + Status plan_status(Status::create(ctx->put_result.status)); + if (!plan_status.ok()) { + LOG(WARNING) << "plan http load failed. errmsg=" << plan_status << ctx->brief(); + return plan_status; + } + + VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params); + // if we not use streaming, we must download total content before we begin + // to process this load + if (!ctx->use_streaming) { + return Status::OK(); + } + + return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); +} + +Status HttpLoadAction::_data_saved_path(THttpLoadPutParams& params, std::string* file_path) { + std::string prefix; + RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(params.db, "", &prefix)); + timeval tv; + gettimeofday(&tv, nullptr); + struct tm tm; + time_t cur_sec = tv.tv_sec; + localtime_r(&cur_sec, &tm); + char buf[64]; + strftime(buf, 64, "%Y%m%d%H%M%S", &tm); + std::stringstream ss; + ss << prefix << "/" << params.table << "." << buf << "." << tv.tv_usec; + *file_path = ss.str(); + return Status::OK(); +} + +void HttpLoadAction::_save_stream_load_record(std::shared_ptr ctx, + const std::string& str) { + auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder(); + if (stream_load_recorder != nullptr) { + std::string key = + std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label; + auto st = stream_load_recorder->put(key, str); + if (st.ok()) { + LOG(INFO) << "put http_load_record rocksdb successfully. label: " << ctx->label + << ", key: " << key; + } + } else { + LOG(WARNING) << "put http_load_record rocksdb failed. stream_load_recorder is null."; + } +} +} // namespace doris diff --git a/be/src/http/action/http_load.h b/be/src/http/action/http_load.h new file mode 100644 index 00000000000000..053eea0926baa4 --- /dev/null +++ b/be/src/http/action/http_load.h @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "http/http_handler.h" +#include "util/metrics.h" + +namespace doris { + +class ExecEnv; +class Status; +class StreamLoadContext; + +class HttpLoadAction : public HttpHandler { +public: + HttpLoadAction(ExecEnv* exec_env); + ~HttpLoadAction() override; + + void handle(HttpRequest* req) override; + + bool request_will_be_read_progressively() override { return true; } + + int on_header(HttpRequest* req) override; + + void on_chunk_data(HttpRequest* req) override; + void free_handler_ctx(std::shared_ptr ctx) override; + +private: + Status _on_header(HttpRequest* http_req, THttpLoadPutParams& params, + std::shared_ptr ctx); + Status _handle(THttpLoadPutParams& params, std::shared_ptr ctx); + Status _data_saved_path(THttpLoadPutParams& params, std::string* file_path); + Status _process_put(THttpLoadPutParams& params, std::shared_ptr ctx); + void _save_stream_load_record(std::shared_ptr ctx, const std::string& str); + void _parse_format(const std::string& format_str, const std::string& compress_type_str, + TFileFormatType::type* format_type, TFileCompressType::type* compress_type); + bool _is_format_support_streaming(TFileFormatType::type format); + +private: + ExecEnv* _exec_env; + + std::shared_ptr _http_load_entity; + IntCounter* http_load_requests_total; + IntCounter* http_load_duration_ms; + IntGauge* http_load_current_processing; +}; + +} // namespace doris diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 86df938af14fdd..a87c87daf57a7a 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -59,5 +59,6 @@ static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns"; static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; static const std::string HTTP_TXN_OPERATION_KEY = "txn_operation"; +static const std::string HTTP_SQL = "sql"; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 0e004b12f55f1e..7b164c2ae20929 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -171,6 +171,7 @@ class StreamLoadContext { TStreamLoadPutResult put_result; TStreamLoadMultiTablePutResult multi_table_put_result; + THttpLoadPutResult http_put_result; std::vector commit_infos; diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 487e84c1d6197c..537340bff165e8 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -31,6 +31,7 @@ #include "http/action/download_binlog_action.h" #include "http/action/file_cache_action.h" #include "http/action/health_action.h" +#include "http/action/http_load.h" #include "http/action/jeprofile_actions.h" #include "http/action/meta_action.h" #include "http/action/metrics_action.h" @@ -78,6 +79,8 @@ Status HttpService::start() { streamload_2pc_action); _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load_2pc", streamload_2pc_action); + HttpLoadAction* httpload_action = _pool.add(new HttpLoadAction(_env)); + _ev_http_server->register_handler(HttpMethod::PUT, "/api/v2/_load", httpload_action); // register download action std::vector allow_paths; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 55a43ed4c5251b..f267a1bd00c045 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -651,13 +651,10 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { return Status::InvalidArgument( "start offset of TFileRangeDesc must be zero in get parsered schema"); } - if (_params.file_type == TFileType::FILE_STREAM || - _params.file_type == TFileType::FILE_BROKER) { + if (_params.file_type == TFileType::FILE_BROKER) { return Status::InternalError( - "Getting parsered schema from csv file do not support stream load and broker " - "load."); + "Getting parsered schema from csv file do not support broker load."); } - // csv file without names line and types line. *read_line = 1; *is_parse_name = false; @@ -675,6 +672,8 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { _file_description.start_offset = start_offset; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); + reader_options.modification_time = + _range.__isset.modification_time ? _range.modification_time : 0; _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description, reader_options, &_file_system, &_file_reader)); diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/http.md b/docs/en/docs/sql-manual/sql-functions/table-functions/http.md new file mode 100644 index 00000000000000..b3b04aebeb732d --- /dev/null +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/http.md @@ -0,0 +1,106 @@ +--- +{ + "title": "http", + "language": "en" +} +--- + + + +## Http + +### Name + +### Description + +The HTTP table function (table-valued-function,tvf) allows users to upload data using SQL + +#### syntax +```sql +curl --location-trusted -u user:passwd -H "sql: '${load_sql}'" -T data.csv http://127.0.0.1:8030/api/v2/_load +``` + +Where ${load_sql} is in the following format +``` +insert into db.table select * from +http( +"format" = "CSV", +"column_separator" = "," +... +) [where t1 > 0]; +``` + +**parameter description** + +Related parameters for accessing http: +- column_separator + + Used to specify the column separator in the load file. The default is `\t`. If it is an invisible character, you need to add `\x` as a prefix and hexadecimal to indicate the separator. + + For example, the separator `\x01` of the hive file needs to be specified as `-H "column_separator:\x01"`. + + You can use a combination of multiple characters as the column separator. + +- line_delimiter + + Used to specify the line delimiter in the load file. The default is `\n`. + + You can use a combination of multiple characters as the column separator. + +- max_filter_ratio + + The maximum tolerance rate of the import task is 0 by default, and the range of values is 0-1. When the import error rate exceeds this value, the import fails. + + If the user wishes to ignore the wrong row, the import can be successful by setting this parameter greater than 0. + + The calculation formula is as follows: + + ``` (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio ``` + + ``` dpp.abnorm.ALL``` denotes the number of rows whose data quality is not up to standard. Such as type mismatch, column mismatch, length mismatch and so on. + + ``` dpp.norm.ALL ``` refers to the number of correct data in the import process. The correct amount of data for the import task can be queried by the ``SHOW LOAD` command. + +The number of rows in the original file = `dpp.abnorm.ALL + dpp.norm.ALL` + +- Partitions + + Partitions information for tables to be imported will not be imported if the data to be imported does not belong to the specified Partition. These data will be included in `dpp.abnorm.ALL`. + +- format + + Specify the import data format, support csv, json, the default is csv + + +- exec_mem_limit + + Memory limit. Default is 2GB. Unit is Bytes + +### Examples + +上传数据 +```shell +curl -v --location-trusted -u root: -H "sql: insert into test.t1(k1,k2) select k1,k2 from http(\"format\" = \"CSV\", \"column_separator\" = \",\")" -T example.csv http://127.0.0.1:8030/api/v2/_load +``` + + +### Keywords + + http, table-valued-function, tvf diff --git a/docs/sidebars.json b/docs/sidebars.json index 350f93755cc2da..6fd1e72c42c234 100644 --- a/docs/sidebars.json +++ b/docs/sidebars.json @@ -710,6 +710,7 @@ "sql-manual/sql-functions/table-functions/explode-numbers", "sql-manual/sql-functions/table-functions/s3", "sql-manual/sql-functions/table-functions/hdfs", + "sql-manual/sql-functions/table-functions/http", "sql-manual/sql-functions/table-functions/iceberg_meta", "sql-manual/sql-functions/table-functions/backends", "sql-manual/sql-functions/table-functions/frontends", diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/http.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/http.md new file mode 100644 index 00000000000000..4d31bc7e4e67cb --- /dev/null +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/http.md @@ -0,0 +1,106 @@ +--- +{ + "title": "http", + "language": "zh-CN" +} +--- + + + +## Http + +### Name + +### Description + +http 表函数(table-valued-function,tvf),可以让用户使用 sql 方式上传数据 + +#### syntax +```sql +curl --location-trusted -u user:passwd -H "sql: '${load_sql}'" -T data.csv http://127.0.0.1:8030/api/v2/_load +``` + +其中 ${load_sql} 为以下格式 +``` +insert into db.table select * from +http( +"format" = "CSV", +"column_separator" = "," +... +) [where t1 > 0]; +``` + +**参数说明** + +访问 http 相关参数: +- column_separator + + 用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。 + + 如 hive 文件的分隔符\x01,需要指定为-H "column_separator:\x01"。 + + 可以使用多个字符的组合作为列分隔符。 + +- line_delimiter + + 用于指定导入文件中的换行符,默认为\n。 + + 可以使用做多个字符的组合作为换行符。 + +- max_filter_ratio + + 导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。 + + 如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。 + + 计算公式为: + + `(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio` + + `dpp.abnorm.ALL` 表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。 + + `dpp.norm.ALL` 指的是导入过程中正确数据的条数。可以通过 `SHOW LOAD` 命令查询导入任务的正确数据量。 + + 原始文件的行数 = `dpp.abnorm.ALL + dpp.norm.ALL` + +- Partitions + + 待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 `dpp.abnorm.ALL` + +- format + + 指定导入数据格式,支持csv、json,默认是csv + + +- exec_mem_limit + + 导入内存限制。默认为 2GB,单位为字节。 + +### Examples + +上传数据 +```shell +curl -v --location-trusted -u root: -H "sql: insert into test.t1(k1,k2) select k1,k2 from http(\"format\" = \"CSV\", \"column_separator\" = \",\")" -T example.csv http://127.0.0.1:8030/api/v2/_load +``` + + +### Keywords + + http, table-valued-function, tvf diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java index ba1b07eb4c6bf6..a09b86005cdaa7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java @@ -44,7 +44,10 @@ public TableValuedFunctionRef(String funcName, String alias, Map this.funcName = funcName; this.params = params; this.tableFunction = TableValuedFunctionIf.getTableFunction(funcName, params); - this.table = tableFunction.getTable(); + // skip http tvf + if (!funcName.equals("http")) { + this.table = tableFunction.getTable(); + } if (hasExplicitAlias()) { return; } @@ -125,4 +128,7 @@ public TableValuedFunctionIf getTableFunction() { return tableFunction; } + public Map getParams() { + return params; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index fa02f458076e6e..05c8930f71682a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -56,6 +56,15 @@ public class LoadAction extends RestBaseController { private ExecuteEnv execEnv = ExecuteEnv.getInstance(); + @RequestMapping(path = "/api/v2/_load", method = RequestMethod.PUT) + public Object httpLoad(HttpServletRequest request, HttpServletResponse response) { + if (needRedirect(request.getScheme())) { + return redirectToHttps(request); + } + executeCheckPassword(request, response); + return executeHttpLoad(request); + } + @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", method = RequestMethod.PUT) public Object load(HttpServletRequest request, HttpServletResponse response, @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { @@ -177,6 +186,22 @@ private Object executeWithoutPassword(HttpServletRequest request, } } + private Object executeHttpLoad(HttpServletRequest request) { + try { + if (request.getHeader(HttpHeaderNames.EXPECT.toString()) == null) { + return new RestBaseResult("There is no 100-continue header"); + } + final String clusterName = ConnectContext.get().getClusterName(); + TNetworkAddress redirectAddr = selectRedirectBackend(clusterName); + LOG.info("redirect http load action to destination={}", redirectAddr.toString()); + + RedirectView redirectView = redirectTo(request, redirectAddr); + return redirectView; + } catch (Exception e) { + return new RestBaseResult(e.getMessage()); + } + } + private Object executeStreamLoad2PC(HttpServletRequest request, String db) { try { String dbName = db; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Http.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Http.java new file mode 100644 index 00000000000000..ab6cf8fe9e5231 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Http.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.TVFProperties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.statistics.Statistics; +import org.apache.doris.tablefunction.HdfsTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** http */ +public class Http extends TableValuedFunction { + public Http(TVFProperties properties) { + super("http", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE, (List) getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new HdfsTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build HttpTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public Statistics computeStats(List slots) { + return new Statistics(0, new HashMap<>()); + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitHttp(this, context); + } + + @Override + public PhysicalProperties getPhysicalProperties() { + return PhysicalProperties.ANY; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index 0c4adcabe1ebb4..ab506cadfb6410 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.trees.expressions.visitor; import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs; +import org.apache.doris.nereids.trees.expressions.functions.table.Http; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction; @@ -34,6 +35,10 @@ default R visitHdfs(Hdfs hdfs, C context) { return visitTableValuedFunction(hdfs, context); } + default R visitHttp(Http http, C context) { + return visitTableValuedFunction(http, context); + } + default R visitS3(S3 s3, C context) { return visitTableValuedFunction(s3, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index faa79efbb88586..10b4f807291e2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -21,10 +21,17 @@ import org.apache.doris.analysis.AddColumnsClause; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.ColumnDef; +import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.RestoreStmt; +import org.apache.doris.analysis.SelectStmt; import org.apache.doris.analysis.SetType; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TableRef; +import org.apache.doris.analysis.TableValuedFunctionRef; import org.apache.doris.analysis.TypeDef; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.backup.Snapshot; @@ -58,6 +65,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.annotation.LogException; +import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.Util; import org.apache.doris.cooldown.CooldownDelete; import org.apache.doris.datasource.CatalogIf; @@ -128,6 +136,8 @@ import org.apache.doris.thrift.TGetTablesResult; import org.apache.doris.thrift.TGetTabletReplicaInfosRequest; import org.apache.doris.thrift.TGetTabletReplicaInfosResult; +import org.apache.doris.thrift.THttpLoadPutParams; +import org.apache.doris.thrift.THttpLoadPutRequest; import org.apache.doris.thrift.TInitExternalCtlMetaRequest; import org.apache.doris.thrift.TInitExternalCtlMetaResult; import org.apache.doris.thrift.TListPrivilegesResult; @@ -187,10 +197,12 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.map.CaseInsensitiveMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import java.io.StringReader; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; @@ -1785,6 +1797,89 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ return result; } + @Override + public THttpLoadPutParams httpLoadPut(THttpLoadPutRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("receive http load put request: {}, backend: {}", request, clientAddr); + THttpLoadPutParams params = null; + try { + params = httpLoadPutImpl(request); + } catch (UserException e) { + LOG.warn("failed to get stream load plan: {}", e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + } + return params; + } + + private THttpLoadPutParams httpLoadPutImpl(THttpLoadPutRequest request) throws UserException { + LOG.info("receive http load put request"); + THttpLoadPutParams params = new THttpLoadPutParams(); + String loadSql = request.getLoadSql(); + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setQueryId(request.getLoadId()); + ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER); + ctx.setCurrentUserIdentity(UserIdentity.ROOT); + ctx.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser()); + ctx.setThreadLocalInfo(); + + SqlScanner input = new SqlScanner(new StringReader(loadSql), ctx.getSessionVariable().getSqlMode()); + SqlParser parser = new SqlParser(input); + try { + StatementBase parsedStmt = SqlParserUtils.getFirstStmt(parser); + if (parsedStmt instanceof InsertStmt) { + InsertStmt insertStmt = (InsertStmt) parsedStmt; + + SelectStmt selectStmt = (SelectStmt) insertStmt.getQueryStmt(); + + List tableRefList = selectStmt.getTableRefs(); + TableValuedFunctionRef tableValuedFunctionRef = (TableValuedFunctionRef) tableRefList.get(0); + // case-insensitive + Map paramMap = new CaseInsensitiveMap(tableValuedFunctionRef.getParams()); + + params.setDb(insertStmt.getDbName()); + params.setTable(insertStmt.getTbl()); + params.setLabel(insertStmt.getLabel()); + // setting params + params.setColumnSeparator(paramMap.get("column_separator")); + params.setLineDelimiter(paramMap.get("line_delimiter")); + params.setMaxFilterRatio(Double.valueOf(paramMap.getOrDefault("max_filter_ratio", "0"))); + params.setWhere(paramMap.get("where")); + params.setPartitions(paramMap.get("partitions")); + params.setTemporaryPartitions(paramMap.get("temporary_partitions")); + params.setColumns(paramMap.get("columns")); + params.setFormat(paramMap.get("format")); + params.setExecMemLimit(paramMap.get("exec_mem_limit")); + params.setStrictMode(Boolean.valueOf(paramMap.getOrDefault("strict_mode", "false"))); + // params.setMergeType((TMergeType) paramMap.get("merge_type")); + params.setTwoPhaseCommit(Boolean.valueOf(paramMap.getOrDefault("two_phase_commit", "false"))); + params.setEnableProfile(Boolean.valueOf(paramMap.getOrDefault("enable_profile", "false"))); + params.setCompressType(paramMap.get("compress_type")); + params.setReadJsonByLine(Boolean.valueOf(paramMap.getOrDefault("read_json_by_line", "false"))); + params.setTimeout(paramMap.get("timeout")); + params.setComment(paramMap.get("comment")); + params.setNegative(Boolean.valueOf(paramMap.getOrDefault("negative", "false"))); + params.setJsonpaths(paramMap.get("jsonpaths")); + params.setJsonRoot(paramMap.get("json_root")); + params.setStripOuterArray(Boolean.valueOf(paramMap.getOrDefault("strip_outer_array", "false"))); + params.setNumAsString(Boolean.valueOf(paramMap.getOrDefault("num_as_string", "false"))); + params.setFuzzyParse(Boolean.valueOf(paramMap.getOrDefault("fuzzy_parse", "false"))); + params.setSequenceCol(paramMap.get("sequence_col")); + params.setSendBatchParallelism(Integer.valueOf(paramMap.getOrDefault("send_batch_parallelism", "0"))); + params.setLoadToSingleTablet(Boolean.valueOf(paramMap.getOrDefault("load_to_single_tablet", "false"))); + params.setDeleteCondition(paramMap.get("delete_condition")); + params.setHiddenColumns(paramMap.get("hidden_columns")); + params.setTrimDoubleQuotes(Boolean.valueOf(paramMap.getOrDefault("trim_double_quotes", "false"))); + params.setSkipLines(Integer.valueOf(paramMap.getOrDefault("skip_lines", "0"))); + params.setPartialColumns(Boolean.valueOf(paramMap.getOrDefault("partial_columns", "false"))); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return params; + } + private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest request) throws UserException { String cluster = request.getCluster(); if (Strings.isNullOrEmpty(cluster)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 49961afcbc0f01..6d53f8f1ee0c0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -82,7 +82,7 @@ import java.util.stream.Collectors; /** - * ExternalFileTableValuedFunction is used for S3/HDFS/LOCAL table-valued-function + * ExternalFileTableValuedFunction is used for S3/HDFS/LOCAL/http table-valued-function */ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctionIf { public static final Logger LOG = LogManager.getLogger(ExternalFileTableValuedFunction.class); @@ -187,10 +187,19 @@ public List getPathPartitionKeys() { protected void parseFile() throws AnalysisException { String path = getFilePath(); BrokerDesc brokerDesc = getBrokerDesc(); - try { - BrokerUtil.parseFile(path, brokerDesc, fileStatuses); - } catch (UserException e) { - throw new AnalysisException("parse file failed, path = " + path, e); + // create dummy file status for http load + if (getTFileType() == TFileType.FILE_STREAM) { + TBrokerFileStatus fileStatus = new TBrokerFileStatus(); + fileStatus.setPath(""); + fileStatus.setIsDir(false); + fileStatus.setSize(-1); // must set to -1, means stream. + fileStatuses.add(fileStatus); + } else { + try { + BrokerUtil.parseFile(path, brokerDesc, fileStatuses); + } catch (UserException e) { + throw new AnalysisException("parse file failed, path = " + path, e); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpTableValuedFunction.java new file mode 100644 index 00000000000000..162eb542001a03 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpTableValuedFunction.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.thrift.TFileType; + +import org.apache.commons.collections.map.CaseInsensitiveMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +/** + * The Implement of table valued function + * http("FORMAT" = "csv", "column_separator" = ","). + */ +public class HttpTableValuedFunction extends ExternalFileTableValuedFunction { + private static final Logger LOG = LogManager.getLogger(HttpTableValuedFunction.class); + public static final String NAME = "http"; + + public HttpTableValuedFunction(Map params) throws AnalysisException { + Map validParams = new CaseInsensitiveMap(); + for (String key : params.keySet()) { + if (!FILE_FORMAT_PROPERTIES.contains(key.toLowerCase())) { + throw new AnalysisException(key + " is invalid property"); + } + validParams.put(key.toLowerCase(), params.get(key)); + } + parseProperties(validParams); + parseFile(); + } + + // =========== implement abstract methods of ExternalFileTableValuedFunction ================= + @Override + public TFileType getTFileType() { + switch (getTFileFormatType()) { + case FORMAT_PARQUET: + case FORMAT_ORC: + return TFileType.FILE_LOCAL; + default: + return TFileType.FILE_STREAM; + } + } + + @Override + public String getFilePath() { + return null; + } + + @Override + public BrokerDesc getBrokerDesc() { + return new BrokerDesc("HttpTvfBroker", StorageType.STREAM, locationProperties); + } + + // =========== implement abstract methods of TableValuedFunctionIf ================= + @Override + public String getTableName() { + return "HttpTableValuedFunction"; + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index ea135b8b1b873b..3fd450c80ee763 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -49,6 +49,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map pipeline_params } +// HttpLoad request, used to load a streaming to engine +struct THttpLoadPutRequest { + 1: required Types.TUniqueId loadId + 2: optional string load_sql // insert into sql used by http load +} + +struct THttpLoadPutResult { + 1: required Status.TStatus status + 2: required i64 txn_id + 3: optional i64 total_rows + 4: optional i64 loaded_rows + 5: optional i64 filtered_rows + 6: optional i64 unselected_rows +} + +// httptvf properties +struct THttpLoadPutParams { + 1: required string db + 2: required string table + 3: optional string label + 4: optional string column_separator + 5: optional string line_delimiter + 6: optional double max_filter_ratio + 7: optional string where + 8: optional string partitions + 9: optional string temporary_partitions + 10: optional string columns + 11: required string format + 12: optional string exec_mem_limit + 13: optional bool strict_mode + 14: optional string merge_type + 15: optional bool two_phase_commit + 16: optional bool enable_profile + 17: optional string compress_type + 18: optional bool read_json_by_line + 19: optional string timeout + 20: optional string comment + 21: optional bool negative + 22: optional string jsonpaths + 23: optional string json_root + 24: optional bool strip_outer_array + 25: optional bool num_as_string + 26: optional bool fuzzy_parse + 27: optional string sequence_col + 28: optional i32 send_batch_parallelism + 29: optional bool load_to_single_tablet + 30: optional string delete_condition // delete + 31: optional string hidden_columns + 32: optional bool trim_double_quotes // trim double quotes for csv + 33: optional i32 skip_lines // csv skip line num, only used when csv header_type is not set. + 34: optional bool partial_columns +} + struct TKafkaRLTaskProgress { 1: required map partitionCmtOffset } @@ -1120,6 +1173,8 @@ service FrontendService { TStreamLoadMultiTablePutResult streamLoadMultiTablePut(1: TStreamLoadPutRequest request) + THttpLoadPutParams httpLoadPut(1: THttpLoadPutRequest request) + Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request) TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request) diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index da19eb49758bef..2fcd8e47140b2e 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -375,6 +375,7 @@ struct TFileScanRangeParams { // Map of slot to its position in table schema. Only for Hive external table. 19: optional map slot_name_to_schema_pos 20: optional list pre_filter_exprs_list + 21: optional Types.TUniqueId load_id } struct TFileRangeDesc { diff --git a/regression-test/data/load_p0/http_load/test_csv_load.csv b/regression-test/data/load_p0/http_load/test_csv_load.csv new file mode 100644 index 00000000000000..5b3552c36914ee --- /dev/null +++ b/regression-test/data/load_p0/http_load/test_csv_load.csv @@ -0,0 +1,2 @@ +10000,北京 +10001,天津 \ No newline at end of file diff --git a/regression-test/data/load_p0/http_load/test_http_load.out b/regression-test/data/load_p0/http_load/test_http_load.out new file mode 100644 index 00000000000000..1b8499919d1318 --- /dev/null +++ b/regression-test/data/load_p0/http_load/test_http_load.out @@ -0,0 +1,5 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +10000 北京 +10001 天津 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/HttpLoadAction.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/HttpLoadAction.groovy new file mode 100644 index 00000000000000..9c8fedb4551141 --- /dev/null +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/HttpLoadAction.groovy @@ -0,0 +1,372 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.regression.action + +import com.google.common.collect.Iterators +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.FromString +import org.apache.doris.regression.suite.SuiteContext +import org.apache.doris.regression.util.BytesInputStream +import org.apache.doris.regression.util.JdbcUtils +import org.apache.doris.regression.util.OutputUtils +import groovy.json.JsonSlurper +import groovy.util.logging.Slf4j +import org.apache.http.HttpEntity +import org.apache.http.HttpStatus +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.entity.FileEntity +import org.apache.http.entity.InputStreamEntity +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils +import org.junit.Assert + +@Slf4j +class HttpLoadAction implements SuiteAction { + public final InetSocketAddress address + public final String user + public final String password + String db + String table + String file + InputStream inputStream + String inputText + Iterator> inputIterator + long time + Closure check + Map headers + SuiteContext context + + HttpLoadAction(SuiteContext context) { + this.address = context.config.feHttpInetSocketAddress + this.user = context.config.feHttpUser + this.password = context.config.feHttpPassword + + def groupList = context.group.split(',') + this.db = context.config.getDbNameByFile(context.file) + + this.context = context + this.headers = new LinkedHashMap<>() + this.headers.put('label', UUID.randomUUID().toString()) + } + + void db(String db) { + this.db = db + } + + void db(Closure db) { + this.db = db.call() + } + + void table(String table) { + this.table = table + } + + void table(Closure table) { + this.table = table.call() + } + + void inputStream(InputStream inputStream) { + this.inputStream = inputStream + } + + void inputStream(Closure inputStream) { + this.inputStream = inputStream.call() + } + + void inputIterator(Iterator> inputIterator) { + this.inputIterator = inputIterator + } + + void inputIterator(Closure>> inputIterator) { + this.inputIterator = inputIterator.call() + } + + void inputText(String inputText) { + this.inputText = inputText + } + + void inputText(Closure inputText) { + this.inputText = inputText.call() + } + + void file(String file) { + this.file = file + } + + void file(Closure file) { + this.file = file.call() + } + + void time(long time) { + this.time = time + } + + void time(Closure time) { + this.time = time.call() + } + + void check(@ClosureParams(value = FromString, options = ["String,Throwable,Long,Long"]) Closure check) { + this.check = check + } + + void set(String key, String value) { + headers.put(key, value) + } + + @Override + void run() { + String responseText = null + Throwable ex = null + long startTime = System.currentTimeMillis() + try { + def uri = "http://${address.hostString}:${address.port}/api/v2/_load" + System.out.println("Hello::::" + uri); + HttpClients.createDefault().withCloseable { client -> + RequestBuilder requestBuilder = prepareRequestHeader(RequestBuilder.put(uri)) + HttpEntity httpEntity = prepareHttpEntity(client) + String beLocation = httpLoadToFe(client, requestBuilder) + responseText = httpLoadToBe(client, requestBuilder, beLocation, httpEntity) + } + } catch (Throwable t) { + ex = t + } + long endTime = System.currentTimeMillis() + + log.info("Http load elapsed ${endTime - startTime} ms, response: ${responseText}".toString() + + ex.toString()) + checkResult(responseText, ex, startTime, endTime) + } + + private String httpGetString(CloseableHttpClient client, String url) { + return client.execute(RequestBuilder.get(url).build()).withCloseable { resp -> + EntityUtils.toString(resp.getEntity()) + } + } + + private InputStream httpGetStream(CloseableHttpClient client, String url) { + CloseableHttpResponse resp = client.execute(RequestBuilder.get(url).build()) + int code = resp.getStatusLine().getStatusCode() + if (code != HttpStatus.SC_OK) { + String streamBody = EntityUtils.toString(resp.getEntity()) + throw new IllegalStateException("Get http stream failed, status code is ${code}, body:\n${streamBody}") + } + + return resp.getEntity().getContent() + } + + private RequestBuilder prepareRequestHeader(RequestBuilder requestBuilder) { + String encoding = Base64.getEncoder() + .encodeToString((user + ":" + (password == null ? "" : password)).getBytes("UTF-8")) + requestBuilder.setHeader("Authorization", "Basic ${encoding}") + + for (Map.Entry entry : headers.entrySet()) { + requestBuilder.setHeader(entry.key, entry.value) + } + requestBuilder.setHeader("Expect", "100-Continue") + return requestBuilder + } + + private String cacheHttpFile(CloseableHttpClient client, String url) { + def relativePath = url.substring(url.indexOf('/', 9)) + def file = new File("${context.config.cacheDataPath}/${relativePath}") + if (file.exists()) { + log.info("Found ${url} in ${file.getAbsolutePath()}"); + return file.getAbsolutePath() + } + log.info("Start to cache data from ${url} to ${file.getAbsolutePath()}"); + CloseableHttpResponse resp = client.execute(RequestBuilder.get(url).build()) + int code = resp.getStatusLine().getStatusCode() + if (code != HttpStatus.SC_OK) { + String streamBody = EntityUtils.toString(resp.getEntity()) + log.info("Fail to download data ${url}, code: ${code}, body:\n${streamBody}") + throw new IllegalStateException("Get http stream failed, status code is ${code}, body:\n${streamBody}") + } + + file.getParentFile().mkdirs(); + new File("${context.config.cacheDataPath}/tmp/").mkdir() + InputStream httpFileStream = resp.getEntity().getContent() + File tmpFile = File.createTempFile("cache", null, new File("${context.config.cacheDataPath}/tmp/")) + + java.nio.file.Files.copy(httpFileStream, tmpFile.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING); + java.nio.file.Files.move(tmpFile.toPath(), file.toPath(), java.nio.file.StandardCopyOption.ATOMIC_MOVE); + log.info("Cached data from ${url} to ${file.getAbsolutePath()}"); + return file.getAbsolutePath() + } + + private HttpEntity prepareHttpEntity(CloseableHttpClient client) { + HttpEntity entity = null + if (inputStream != null) { + entity = new InputStreamEntity(inputStream) + } else if (inputText != null) { + entity = new StringEntity(inputText) + } else if (inputIterator != null) { + def bytesIt = Iterators.transform(inputIterator, + {row -> (OutputUtils.toCsvString(row) + "\n").getBytes()}) + entity = new InputStreamEntity(new BytesInputStream(bytesIt)) + } else { + String fileName = this.file + if (fileName.startsWith("http://") || fileName.startsWith("https://")) { + log.info("Set Http load input: ${fileName}".toString()) + def file = new File(context.config.cacheDataPath) + file.mkdirs(); + + if (file.exists() && file.isDirectory() && context.config.enableCacheData) { + fileName = cacheHttpFile(client, fileName) + } else { + entity = new InputStreamEntity(httpGetStream(client, fileName)) + return entity; + } + } + if (!new File(fileName).isAbsolute()) { + fileName = new File(context.dataPath, fileName).getAbsolutePath() + } + def file = new File(fileName) + if (!file.exists()) { + log.warn("Http load input file not exists: ${file}".toString()) + throw new IllegalStateException("Http load input file not exists: ${file}"); + } + log.info("Set Http load input: ${file.canonicalPath}".toString()) + entity = new FileEntity(file) + } + return entity + } + + private String httpLoadToFe(CloseableHttpClient client, RequestBuilder requestBuilder) { + log.info("Http load to ${requestBuilder.uri}".toString()) + String backendhttpLoadUri = null + client.execute(requestBuilder.build()).withCloseable { resp -> + resp.withCloseable { + String body = EntityUtils.toString(resp.getEntity()) + def respCode = resp.getStatusLine().getStatusCode() + // should redirect to backend + if (respCode != 307) { + throw new IllegalStateException("Expect frontend Http load response code is 307, " + + "but meet ${respCode}\nbody: ${body}") + } + backendhttpLoadUri = resp.getFirstHeader("location").getValue() + } + } + return backendhttpLoadUri + } + + private String httpLoadToBe(CloseableHttpClient client, RequestBuilder requestBuilder, String beLocation, HttpEntity httpEntity) { + log.info("Redirect Http load to ${beLocation}".toString()) + requestBuilder.setUri(beLocation) + requestBuilder.setEntity(httpEntity) + String responseText + try{ + client.execute(requestBuilder.build()).withCloseable { resp -> + resp.withCloseable { + String body = EntityUtils.toString(resp.getEntity()) + def respCode = resp.getStatusLine().getStatusCode() + if (respCode != 200) { + throw new IllegalStateException("Expect backend Http load response code is 200, " + + "but meet ${respCode}\nbody: ${body}") + } + responseText = body + } + } + } catch (Throwable t) { + log.info("HttpLoadAction Exception: ", t) + } + return responseText + } + + private void checkResult(String responseText, Throwable ex, long startTime, long endTime) { + String finalStatus = waitForPublishOrFailure(responseText) + log.info("The origin Http load result: ${responseText}, final status: ${finalStatus}") + responseText = responseText.replace("Publish Timeout", finalStatus) + if (check != null) { + check.call(responseText, ex, startTime, endTime) + } else { + if (ex != null) { + throw ex + } + + def jsonSlurper = new JsonSlurper() + def result = jsonSlurper.parseText(responseText) + String status = result.Status + if (!"Success".equalsIgnoreCase(status)) { + String errorUrl = result.ErrorURL + if (errorUrl != null) { + String errorDetails = HttpClients.createDefault().withCloseable { client -> + httpGetString(client, errorUrl) + } + throw new IllegalStateException("Http load failed:\n${responseText}\n${errorDetails}") + } + throw new IllegalStateException("Http load failed:\n${responseText}") + } + long numberTotalRows = result.NumberTotalRows.toLong() + long numberLoadedRows = result.NumberLoadedRows.toLong() + if (numberTotalRows != numberLoadedRows) { + throw new IllegalStateException("Http load rows mismatch:\n${responseText}") + } + + if (time > 0) { + long elapsed = endTime - startTime + try{ + Assert.assertTrue("Expect elapsed <= ${time}, but meet ${elapsed}", elapsed <= time) + } catch (Throwable t) { + throw new IllegalStateException("Expect elapsed <= ${time}, but meet ${elapsed}") + } + } + } + } + + // Sometime the Http load may return "PUBLISH TIMEOUT" + // This is not a fatal error but may cause test fail. + // So here we wait for at most 60s, using "show transaction" to check the + // status of txn, and return once it become ABORTED or VISIBLE. + private String waitForPublishOrFailure(String responseText) { + try { + long maxWaitSecond = 60; + def jsonSlurper = new JsonSlurper() + def parsed = jsonSlurper.parseText(responseText) + String status = parsed.Status + long txnId = parsed.TxnId + if (!status.equalsIgnoreCase("Publish Timeout")) { + return status; + } + + log.info("Http load with txn ${txnId} is publish timeout") + String sql = "show transaction from ${db} where id = ${txnId}" + String st = "PREPARE" + while (!st.equalsIgnoreCase("VISIBLE") && !st.equalsIgnoreCase("ABORTED") && maxWaitSecond > 0) { + Thread.sleep(2000) + maxWaitSecond -= 2 + def (result, meta) = JdbcUtils.executeToStringList(context.getConnection(), sql) + if (result.size() != 1) { + throw new IllegalStateException("Failed to get txn's ${txnId}") + } + st = String.valueOf(result[0][3]) + } + log.info("Http load with txn ${txnId} is ${st}") + if (st.equalsIgnoreCase("VISIBLE")) { + return "Success"; + } else { + return "Fail"; + } + } catch (Throwable t) { + log.info("failed to waitForPublishOrFailure. response: ${responseText}", t); + throw t; + } + } +} diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index bd8647178078aa..7a161e92810ac8 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -29,6 +29,7 @@ import org.apache.doris.regression.action.CreateMVAction import org.apache.doris.regression.action.ExplainAction import org.apache.doris.regression.action.RestoreAction import org.apache.doris.regression.action.StreamLoadAction +import org.apache.doris.regression.action.HttpLoadAction import org.apache.doris.regression.action.SuiteAction import org.apache.doris.regression.action.TestAction import org.apache.doris.regression.action.HttpCliAction @@ -477,6 +478,10 @@ class Suite implements GroovyInterceptable { runAction(new StreamLoadAction(context), actionSupplier) } + void httpLoad(Closure actionSupplier) { + runAction(new HttpLoadAction(context), actionSupplier) + } + void restore(Closure actionSupplier) { runAction(new RestoreAction(context), actionSupplier) } diff --git a/regression-test/framework/src/main/groovy/suite.gdsl b/regression-test/framework/src/main/groovy/suite.gdsl index da55904de87e25..20c473681f5b43 100644 --- a/regression-test/framework/src/main/groovy/suite.gdsl +++ b/regression-test/framework/src/main/groovy/suite.gdsl @@ -47,6 +47,7 @@ def bindAction = { actionName, actionClassName -> bindAction("test", "org.apache.doris.regression.action.TestAction") bindAction("explain", "org.apache.doris.regression.action.ExplainAction") bindAction("streamLoad", "org.apache.doris.regression.action.StreamLoadAction") +bindAction("httpLoad", "org.apache.doris.regression.action.HttpLoadAction") bindAction("httpTest", "org.apache.doris.regression.action.HttpCliAction") bindAction("benchmark", "org.apache.doris.regression.action.BenchmarkAction") @@ -79,6 +80,7 @@ contributor([suiteContext]) { (!enclosingCall("test") && !enclosingCall("explain") && !enclosingCall("streamLoad") && + !enclosingCall("httpLoad") && !enclosingCall("httpTest"))) { // bind other suite method and field def suiteClass = findClass(suiteClassName) diff --git a/regression-test/suites/load_p0/http_load/test_http_load.groovy b/regression-test/suites/load_p0/http_load/test_http_load.groovy new file mode 100644 index 00000000000000..def60769872b4b --- /dev/null +++ b/regression-test/suites/load_p0/http_load/test_http_load.groovy @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_http_load", "p0") { + sql "show tables" + + def tableName = "test_http_load_table" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int, + `k2` varchar(100) + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2", + "light_schema_change" = "true", + "disable_auto_compaction" = "false", + "enable_single_replica_compaction" = "false" + ); + """ + // test success + + httpLoad { + set 'sql', """ + insert into ${db}.${tableName} (k1, k2) select k1, k2 from http("format"="csv", "column_separator" = ",") + """ + + file 'test_csv_load.csv' + time 10000 // limit inflight 10s + } + + sql "sync" + qt_sql "select * from ${tableName} order by k1, k2" +} +