From 18e09ec0a66e2fe43bb14646518d13b44bae98bf Mon Sep 17 00:00:00 2001 From: yangzhg Date: Thu, 4 Mar 2021 20:30:53 +0800 Subject: [PATCH] [internal] [doris-1084] support compressed csv file in stream load --- be/src/exec/broker_scanner.cpp | 6 ++-- be/src/http/action/stream_load.cpp | 54 +++++++++++++++++++++++------- be/src/http/http_common.h | 1 + gensrc/thrift/PlanNodes.thrift | 2 +- 4 files changed, 46 insertions(+), 17 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 1844ba8442efdc..e9a991660cdb9f 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -180,7 +180,7 @@ Status BrokerScanner::open_file_reader() { } Status BrokerScanner::create_decompressor(TFileFormatType::type type) { - if (_cur_decompressor == nullptr) { + if (_cur_decompressor != nullptr) { delete _cur_decompressor; _cur_decompressor = nullptr; } @@ -208,7 +208,7 @@ Status BrokerScanner::create_decompressor(TFileFormatType::type type) { break; default: { std::stringstream ss; - ss << "Unknown format type, type=" << type; + ss << "Unknown format type, cannot inference compress type, type=" << type; return Status::InternalError(ss.str()); } } @@ -259,7 +259,7 @@ Status BrokerScanner::open_line_reader() { break; default: { std::stringstream ss; - ss << "Unknown format type, type=" << range.format_type; + ss << "Unknown format type, cannot init line reader, type=" << range.format_type; return Status::InternalError(ss.str()); } } diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 47bfdb32e8c75e..a1b9ca2019df1b 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -69,18 +69,46 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit TStreamLoadPutResult k_stream_load_put_result; #endif -static TFileFormatType::type parse_format(const std::string& format_str) { +static TFileFormatType::type parse_format(const std::string& format_str, + const std::string& compress_type) { + if (format_str.empty()) { + return parse_format("CSV", compress_type); + } + TFileFormatType::type format_type = TFileFormatType::FORMAT_UNKNOWN; if (boost::iequals(format_str, "CSV")) { - return TFileFormatType::FORMAT_CSV_PLAIN; + if (compress_type.empty()) { + format_type = TFileFormatType::FORMAT_CSV_PLAIN; + } + if (boost::iequals(compress_type, "GZ")) { + format_type = TFileFormatType::FORMAT_CSV_GZ; + } else if (boost::iequals(compress_type, "LZO")) { + format_type = TFileFormatType::FORMAT_CSV_LZO; + } else if (boost::iequals(compress_type, "BZ2")) { + format_type = TFileFormatType::FORMAT_CSV_BZ2; + } else if (boost::iequals(compress_type, "LZ4FRAME")) { + format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; + } else if (boost::iequals(compress_type, "LZOP")) { + format_type = TFileFormatType::FORMAT_CSV_LZOP; + } else if (boost::iequals(compress_type, "DEFLATE")) { + format_type = TFileFormatType::FORMAT_CSV_DEFLATE; + } } else if (boost::iequals(format_str, "JSON")) { - return TFileFormatType::FORMAT_JSON; + if (compress_type.empty()) { + format_type = TFileFormatType::FORMAT_JSON; + } } - return TFileFormatType::FORMAT_UNKNOWN; + return format_type; } static bool is_format_support_streaming(TFileFormatType::type format) { switch (format) { case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZO: + case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_JSON: return true; default: @@ -214,15 +242,15 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct } // get format of this put - if (http_req->header(HTTP_FORMAT_KEY).empty()) { - ctx->format = TFileFormatType::FORMAT_CSV_PLAIN; - } else { - ctx->format = parse_format(http_req->header(HTTP_FORMAT_KEY)); - if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { - std::stringstream ss; - ss << "unknown data format, format=" << http_req->header(HTTP_FORMAT_KEY); - return Status::InternalError(ss.str()); - } + if (!http_req->header(HTTP_COMPRESS_TYPE).empty() && boost::iequals(http_req->header(HTTP_FORMAT_KEY), "JSON")) { + return Status::InternalError("compress data of JSON format is not supported."); + } + ctx->format = + parse_format(http_req->header(HTTP_FORMAT_KEY), http_req->header(HTTP_COMPRESS_TYPE)); + if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { + std::stringstream ss; + ss << "unknown data format, format=" << http_req->header(HTTP_FORMAT_KEY); + return Status::InternalError(ss.str()); } // check content length diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 5681db6e0c8304..75b8f32eeaff85 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -46,6 +46,7 @@ static const std::string HTTP_MERGE_TYPE = "merge_type"; static const std::string HTTP_DELETE_CONDITION = "delete"; static const std::string HTTP_FUNCTION_COLUMN = "function_column"; static const std::string HTTP_SEQUENCE_COL = "sequence_col"; +static const std::string HTTP_COMPRESS_TYPE = "compress_type"; static const std::string HTTP_100_CONTINUE = "100-continue"; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 58c09c8fd368fe..7c5d70b1d33536 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -105,7 +105,7 @@ enum TFileFormatType { FORMAT_PARQUET, FORMAT_CSV_DEFLATE, FORMAT_ORC, - FORMAT_JSON + FORMAT_JSON, } // One broker range information.