diff --git a/be/src/exec/parquet_writer.cpp b/be/src/exec/parquet_writer.cpp index 474cd41a6d9b24..013c88b9417074 100644 --- a/be/src/exec/parquet_writer.cpp +++ b/be/src/exec/parquet_writer.cpp @@ -30,13 +30,16 @@ #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/mem_pool.h" -#include "runtime/tuple.h" #include "util/thrift_util.h" +#include "util/types.h" namespace doris { /// ParquetOutputStream -ParquetOutputStream::ParquetOutputStream(FileWriter* file_writer) : _file_writer(file_writer) { +ParquetOutputStream::ParquetOutputStream(FileWriter* file_writer) : + _file_writer(file_writer), + _cur_pos(0), + _written_len(0){ set_mode(arrow::io::FileMode::WRITE); } @@ -45,12 +48,16 @@ ParquetOutputStream::~ParquetOutputStream() { } arrow::Status ParquetOutputStream::Write(const void* data, int64_t nbytes) { + if (_is_closed) { + return arrow::Status::OK(); + } size_t written_len = 0; - Status st = _file_writer->write(reinterpret_cast(data), nbytes, &written_len); + Status st = _file_writer->write(static_cast(data), nbytes, &written_len); if (!st.ok()) { return arrow::Status::IOError(st.get_error_msg()); } _cur_pos += written_len; + _written_len += written_len; return arrow::Status::OK(); } @@ -60,33 +67,395 @@ arrow::Status ParquetOutputStream::Tell(int64_t* position) const { } arrow::Status ParquetOutputStream::Close() { + if (_is_closed) { + return arrow::Status::OK(); + } Status st = _file_writer->close(); if (!st.ok()) { + LOG(WARNING) << "close parquet output stream failed: " << st.get_error_msg(); return arrow::Status::IOError(st.get_error_msg()); } _is_closed = true; return arrow::Status::OK(); } +int64_t ParquetOutputStream::get_written_len() { + return _written_len; +} + +void ParquetOutputStream::set_written_len(int64_t written_len) { + _written_len = written_len; +} + /// ParquetWriterWrapper ParquetWriterWrapper::ParquetWriterWrapper(FileWriter* file_writer, - const std::vector& output_expr_ctxs) - : _output_expr_ctxs(output_expr_ctxs) { - // TODO(cmy): implement - _outstream = new ParquetOutputStream(file_writer); + const std::vector& output_expr_ctxs, + const std::map& properties, + const std::vector>& schema) + :_output_expr_ctxs(output_expr_ctxs), + _str_schema(schema), + _cur_writed_rows(0), + _rg_writer(nullptr) { + _outstream = std::shared_ptr(new ParquetOutputStream(file_writer)); + parse_properties(properties); + parse_schema(schema); + init_parquet_writer(); +} + +void ParquetWriterWrapper::parse_properties(const std::map& propertie_map) { + parquet::WriterProperties::Builder builder; + for (auto it = propertie_map.begin(); it != propertie_map.end(); it++) { + std::string property_name = it->first; + std::string property_value = it->second; + if (property_name == "compression") { + // UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2 + if (property_value == "snappy") { + builder.compression(parquet::Compression::SNAPPY); + } else if (property_value == "gzip") { + builder.compression(parquet::Compression::GZIP); + } else if (property_value == "brotli") { + builder.compression(parquet::Compression::BROTLI); + } else if (property_value == "zstd") { + builder.compression(parquet::Compression::ZSTD); + } else if (property_value == "lz4") { + builder.compression(parquet::Compression::LZ4); + } else if (property_value == "lzo") { + builder.compression(parquet::Compression::LZO); + } else if (property_value == "bz2") { + builder.compression(parquet::Compression::BZ2); + } else { + builder.compression(parquet::Compression::UNCOMPRESSED); + } + } else if (property_name == "disable_dictionary") { + if (property_value == "true") { + builder.enable_dictionary(); + } else { + builder.disable_dictionary(); + } + } else if (property_name == "version") { + if (property_value == "v1") { + builder.version(parquet::ParquetVersion::PARQUET_1_0); + } else { + builder.version(parquet::ParquetVersion::PARQUET_2_0); + } + } + } + _properties = builder.build(); +} + +Status ParquetWriterWrapper::parse_schema(const std::vector>& schema) { + parquet::schema::NodeVector fields; + for (auto column = schema.begin(); column != schema.end(); column++) { + std::string repetition_type = (*column)[0]; + parquet::Repetition::type parquet_repetition_type = parquet::Repetition::REQUIRED; + if (repetition_type.find("required") != std::string::npos) { + parquet_repetition_type = parquet::Repetition::REQUIRED; + } else if (repetition_type.find("repeated") != std::string::npos) { + parquet_repetition_type = parquet::Repetition::REPEATED; + } else if (repetition_type.find("optional") != std::string::npos) { + parquet_repetition_type = parquet::Repetition::OPTIONAL; + } else { + parquet_repetition_type = parquet::Repetition::UNDEFINED; + } + + std::string data_type = (*column)[1]; + parquet::Type::type parquet_data_type = parquet::Type::BYTE_ARRAY; + if (data_type =="boolean") { + parquet_data_type = parquet::Type::BOOLEAN; + } else if (data_type.find("int32") != std::string::npos) { + parquet_data_type = parquet::Type::INT32; + } else if (data_type.find("int64") != std::string::npos) { + parquet_data_type = parquet::Type::INT64; + } else if (data_type.find("int96") != std::string::npos) { + parquet_data_type = parquet::Type::INT96; + } else if (data_type.find("float") != std::string::npos) { + parquet_data_type = parquet::Type::FLOAT; + } else if (data_type.find("double") != std::string::npos) { + parquet_data_type = parquet::Type::DOUBLE; + } else if (data_type.find("byte_array") != std::string::npos) { + parquet_data_type = parquet::Type::BYTE_ARRAY; + } else if (data_type.find("fixed_len_byte_array") != std::string::npos) { + parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY; + } else { + parquet_data_type = parquet::Type::UNDEFINED; + } + + std::string column_name = (*column)[2]; + fields.push_back(parquet::schema::PrimitiveNode::Make(column_name, parquet_repetition_type, + parquet::LogicalType::None(), parquet_data_type)); + _schema = std::static_pointer_cast( + parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields)); + } + return Status::OK(); } Status ParquetWriterWrapper::write(const RowBatch& row_batch) { - // TODO(cmy): implement + int num_rows = row_batch.num_rows(); + for (int i = 0; i < num_rows; ++i) { + TupleRow* row = row_batch.get_row(i); + RETURN_IF_ERROR(_write_one_row(row)); + _cur_writed_rows++; + } return Status::OK(); } +Status ParquetWriterWrapper::init_parquet_writer() { + _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, _properties); + if (_writer == nullptr) { + return Status::InternalError("Failed to create file writer"); + } + return Status::OK(); +} + +parquet::RowGroupWriter* ParquetWriterWrapper::get_rg_writer() { + if (_rg_writer == nullptr) { + _rg_writer = _writer->AppendBufferedRowGroup(); + } + if (_cur_writed_rows > _max_row_per_group) { + _rg_writer->Close(); + _rg_writer = _writer->AppendBufferedRowGroup(); + _cur_writed_rows = 0; + } + return _rg_writer; +} + +Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { + int num_columns = _output_expr_ctxs.size(); + if (num_columns != _str_schema.size()) { + return Status::InternalError("project field size is not equal to schema column size"); + } + try { + for (int index = 0; index < num_columns; ++index) { + void *item = _output_expr_ctxs[index]->get_value(row); + switch (_output_expr_ctxs[index]->root()->type().type) { + case TYPE_BOOLEAN: { + if (_str_schema[index][1] != "boolean") { + std::stringstream ss; + ss << "project field type is boolean, but the definition type of column " + << _str_schema[index][2] << " is " << _str_schema[index][1]; + return Status::InvalidArgument(ss.str()); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::BoolWriter *col_writer = static_cast(rgWriter->column(index)); + if (item != nullptr) { + col_writer->WriteBatch(1, nullptr, nullptr, static_cast (item)); + } else { + bool default_bool = false; + col_writer->WriteBatch(1, nullptr, nullptr, &default_bool); + } + break; + } + case TYPE_TINYINT: + case TYPE_SMALLINT: + case TYPE_INT: { + if (_str_schema[index][1] != "int32") { + std::stringstream ss; + ss << "project field type is tiny int/small int/int, should use int32, but the definition type of column " + << _str_schema[index][2] << " is " << _str_schema[index][1]; + return Status::InvalidArgument(ss.str()); + } + + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int32Writer *col_writer = static_cast(rgWriter->column(index)); + if (item != nullptr) { + col_writer->WriteBatch(1, nullptr, nullptr, static_cast(item)); + } else { + int32_t default_int32 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int32); + } + break; + } + case TYPE_BIGINT: { + if (_str_schema[index][1] != "int64") { + std::stringstream ss; + ss << "project field type is big int, should use int64, but the definition type of column " + << _str_schema[index][2] << " is " << _str_schema[index][1]; + return Status::InvalidArgument(ss.str()); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int64Writer *col_writer = static_cast(rgWriter->column(index)); + if (item != nullptr) { + col_writer->WriteBatch(1, nullptr, nullptr, (int64_t * )(item)); + } else { + int64_t default_int644 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int644); + } + break; + } + case TYPE_LARGEINT: { + // TODO: not support int_128 + // It is better write a default value, because rg_writer need all columns has value before flush to disk. + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int64Writer *col_writer = static_cast(rgWriter->column(index)); + int64_t default_int64 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int64); + return Status::InvalidArgument("do not support large int type."); + } + case TYPE_FLOAT: { + if (_str_schema[index][1] != "float") { + std::stringstream ss; + ss << "project field type is float, but the definition type of column " + << _str_schema[index][2] << " is " << _str_schema[index][1]; + return Status::InvalidArgument(ss.str()); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::FloatWriter *col_writer = static_cast(rgWriter->column(index)); + if (item != nullptr) { + col_writer->WriteBatch(1, nullptr, nullptr, (float_t *) (item)); + } else { + float_t default_float = 0.0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_float); + } + break; + } + case TYPE_DOUBLE: { + if (_str_schema[index][1] != "double") { + std::stringstream ss; + ss << "project field type is double, but the definition type of column " + << _str_schema[index][2] << " is " << _str_schema[index][1]; + return Status::InvalidArgument(ss.str()); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::DoubleWriter *col_writer = static_cast(rgWriter->column(index)); + if (item != nullptr) { + col_writer->WriteBatch(1, nullptr, nullptr, (double_t *) (item)); + } else { + double_t default_double = 0.0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_double); + } + break; + } + case TYPE_DATETIME: + case TYPE_DATE: { + if (_str_schema[index][1] != "int64") { + std::stringstream ss; + ss << "project field type is date/datetime, should use int64, but the definition type of column " + << _str_schema[index][2] << " is " << _str_schema[index][1]; + return Status::InvalidArgument(ss.str()); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int64Writer *col_writer = static_cast(rgWriter->column(index)); + if (item != nullptr) { + const DateTimeValue *time_val = (const DateTimeValue *) (item); + int64_t timestamp = time_val->to_olap_datetime(); + col_writer->WriteBatch(1, nullptr, nullptr, ×tamp); + } else { + int64_t default_int64 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int64); + } + break; + } + case TYPE_CHAR: + case TYPE_VARCHAR: { + if (_str_schema[index][1] != "byte_array") { + std::stringstream ss; + ss << "project field type is char/varchar, should use byte_array, but the definition type of column " + << _str_schema[index][2] << " is " << _str_schema[index][1]; + return Status::InvalidArgument(ss.str()); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::ByteArrayWriter *col_writer = static_cast(rgWriter->column(index)); + if (item != nullptr) { + const StringValue *string_val = (const StringValue *) (item); + parquet::ByteArray value; + value.ptr = reinterpret_cast(string_val->ptr); + value.len = string_val->len; + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } else { + parquet::ByteArray value; + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } + break; + } + case TYPE_DECIMAL: { + if (_str_schema[index][1] != "byte_array") { + std::stringstream ss; + ss << "project field type is decimal, should use byte_array, but the definition type of column " + << _str_schema[index][2] << " is " << _str_schema[index][1]; + return Status::InvalidArgument(ss.str()); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::ByteArrayWriter *col_writer = static_cast(rgWriter->column(index)); + if (item != nullptr) { + const DecimalValue *decimal_val = reinterpret_cast(item); + std::string decimal_str; + int output_scale = _output_expr_ctxs[index]->root()->output_scale(); + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val->to_string(output_scale); + } else { + decimal_str = decimal_val->to_string(); + } + parquet::ByteArray value; + value.ptr = reinterpret_cast(&decimal_str); + value.len = decimal_str.length(); + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } else { + parquet::ByteArray value; + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } + break; + } + case TYPE_DECIMALV2: { + if (_str_schema[index][1] != "byte_array") { + std::stringstream ss; + ss << "project field type is decimal v2, should use byte_array, but the definition type of column " + << _str_schema[index][2] << " is " << _str_schema[index][1]; + return Status::InvalidArgument(ss.str()); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::ByteArrayWriter *col_writer = static_cast(rgWriter->column(index)); + if (item != nullptr) { + const DecimalV2Value decimal_val( + reinterpret_cast(item)->value); + std::string decimal_str; + int output_scale = _output_expr_ctxs[index]->root()->output_scale(); + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val.to_string(output_scale); + } else { + decimal_str = decimal_val.to_string(); + } + parquet::ByteArray value; + value.ptr = reinterpret_cast(&decimal_str); + value.len = decimal_str.length(); + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } else { + parquet::ByteArray value; + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } + break; + } + default: { + std::stringstream ss; + ss << "unsupported file format: " << _output_expr_ctxs[index]->root()->type().type; + return Status::InvalidArgument(ss.str()); + } + } + } + } catch (const std::exception& e) { + LOG(WARNING) << "Parquet write error: " << e.what(); + return Status::InternalError(e.what()); + } + return Status::OK(); +} + +int64_t ParquetWriterWrapper::written_len() { + return _outstream->get_written_len(); +} void ParquetWriterWrapper::close() { - // TODO(cmy): implement + try { + if (_rg_writer != nullptr) { + _rg_writer->Close(); + _rg_writer = nullptr; + } + _writer->Close(); + _outstream->Close(); + } catch (const std::exception& e) { + _rg_writer = nullptr; + LOG(WARNING) <<"Parquet writer close error: " << e.what(); + } } ParquetWriterWrapper::~ParquetWriterWrapper() { - close(); } } // namespace doris diff --git a/be/src/exec/parquet_writer.h b/be/src/exec/parquet_writer.h index 7499e81d29c1ce..0827b0299e4b60 100644 --- a/be/src/exec/parquet_writer.h +++ b/be/src/exec/parquet_writer.h @@ -36,16 +36,17 @@ #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/Types_types.h" +#include "runtime/tuple.h" +#include "runtime/row_batch.h" +#include "exprs/expr_context.h" namespace doris { - -class ExprContext; class FileWriter; -class RowBatch; class ParquetOutputStream : public arrow::io::OutputStream { public: ParquetOutputStream(FileWriter* file_writer); + ParquetOutputStream(FileWriter* file_writer, const int64_t& written_len); virtual ~ParquetOutputStream(); arrow::Status Write(const void* data, int64_t nbytes) override; @@ -55,26 +56,52 @@ class ParquetOutputStream : public arrow::io::OutputStream { bool closed() const override { return _is_closed; } + int64_t get_written_len(); + + void set_written_len(int64_t written_len); + private: FileWriter* _file_writer; // not owned - int64_t _cur_pos; // current write position + int64_t _cur_pos = 0; // current write position bool _is_closed = false; + int64_t _written_len = 0; }; // a wrapper of parquet output stream class ParquetWriterWrapper { public: ParquetWriterWrapper(FileWriter* file_writer, - const std::vector& output_expr_ctxs); + const std::vector& output_expr_ctxs, + const std::map& properties, + const std::vector>& schema); virtual ~ParquetWriterWrapper(); Status write(const RowBatch& row_batch); + Status init_parquet_writer(); + + Status _write_one_row(TupleRow* row); + void close(); + void parse_properties(const std::map& propertie_map); + + Status parse_schema(const std::vector>& schema); + + parquet::RowGroupWriter* get_rg_writer(); + + int64_t written_len(); + private: - ParquetOutputStream* _outstream; + std::shared_ptr _outstream; + std::shared_ptr _properties; + std::shared_ptr _schema; + std::unique_ptr _writer; const std::vector& _output_expr_ctxs; + std::vector> _str_schema; + int64_t _cur_writed_rows = 0; + parquet::RowGroupWriter* _rg_writer; + const int64_t _max_row_per_group = 10; }; } // namespace doris diff --git a/be/src/runtime/file_result_writer.cpp b/be/src/runtime/file_result_writer.cpp index 758f8528712f69..48f73d80dd40f3 100644 --- a/be/src/runtime/file_result_writer.cpp +++ b/be/src/runtime/file_result_writer.cpp @@ -112,20 +112,20 @@ Status FileResultWriter::_create_file_writer(const std::string& file_name) { _file_opts->broker_properties, file_name, 0 /*start offset*/); } RETURN_IF_ERROR(_file_writer->open()); - switch (_file_opts->file_format) { case TFileFormatType::FORMAT_CSV_PLAIN: // just use file writer is enough break; case TFileFormatType::FORMAT_PARQUET: - _parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs); + _parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs, + _file_opts->file_properties, _file_opts->schema); break; default: return Status::InternalError( strings::Substitute("unsupported file format: $0", _file_opts->file_format)); } LOG(INFO) << "create file for exporting query result. file name: " << file_name - << ". query id: " << print_id(_state->query_id()); + << ". query id: " << print_id(_state->query_id()) << " format:" << _file_opts->file_format; return Status::OK(); } @@ -167,7 +167,7 @@ Status FileResultWriter::append_row_batch(const RowBatch* batch) { SCOPED_TIMER(_append_row_batch_timer); if (_parquet_writer != nullptr) { - RETURN_IF_ERROR(_parquet_writer->write(*batch)); + RETURN_IF_ERROR(_write_parquet_file(*batch)); } else { RETURN_IF_ERROR(_write_csv_file(*batch)); } @@ -176,6 +176,13 @@ Status FileResultWriter::append_row_batch(const RowBatch* batch) { return Status::OK(); } +Status FileResultWriter::_write_parquet_file(const RowBatch& batch) { + RETURN_IF_ERROR(_parquet_writer->write(batch)); + // split file if exceed limit + RETURN_IF_ERROR(_create_new_file_if_exceed_size()); + return Status::OK(); +} + Status FileResultWriter::_write_csv_file(const RowBatch& batch) { int num_rows = batch.num_rows(); for (int i = 0; i < num_rows; ++i) { @@ -345,11 +352,12 @@ Status FileResultWriter::_create_new_file_if_exceed_size() { Status FileResultWriter::_close_file_writer(bool done, bool only_close) { if (_parquet_writer != nullptr) { _parquet_writer->close(); + _current_written_bytes = _parquet_writer->written_len(); + COUNTER_UPDATE(_written_data_bytes, _current_written_bytes); delete _parquet_writer; _parquet_writer = nullptr; - if (!done) { - //TODO(cmy): implement parquet writer later - } + delete _file_writer; + _file_writer = nullptr; } else if (_file_writer != nullptr) { _file_writer->close(); delete _file_writer; diff --git a/be/src/runtime/file_result_writer.h b/be/src/runtime/file_result_writer.h index f264c6d840e3e6..f00a09f10735c1 100644 --- a/be/src/runtime/file_result_writer.h +++ b/be/src/runtime/file_result_writer.h @@ -40,6 +40,8 @@ struct ResultFileOptions { std::vector broker_addresses; std::map broker_properties; std::string success_file_name = ""; + std::vector> schema; + std::map file_properties; ResultFileOptions(const TResultFileSinkOptions& t_opt) { file_path = t_opt.file_path; @@ -60,6 +62,12 @@ struct ResultFileOptions { if (t_opt.__isset.success_file_name) { success_file_name = t_opt.success_file_name; } + if (t_opt.__isset.schema) { + schema = t_opt.schema; + } + if (t_opt.__isset.file_properties) { + file_properties = t_opt.file_properties; + } } }; @@ -82,6 +90,7 @@ class FileResultWriter final : public ResultWriter { private: Status _write_csv_file(const RowBatch& batch); + Status _write_parquet_file(const RowBatch& batch); Status _write_one_row_as_csv(TupleRow* row); // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 05f975ddcbbbd0..0d571d46b296ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -18,6 +18,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeNameFormat; @@ -25,15 +26,15 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TResultFileSinkOptions; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import com.clearspring.analytics.util.Lists; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; - +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -46,6 +47,8 @@ public class OutFileClause { public static final List RESULT_COL_NAMES = Lists.newArrayList(); public static final List RESULT_COL_TYPES = Lists.newArrayList(); + public static final List PARQUET_REPETITION_TYPES = Lists.newArrayList(); + public static final List PARQUET_DATA_TYPES = Lists.newArrayList(); static { RESULT_COL_NAMES.add("FileNumber"); @@ -57,6 +60,19 @@ public class OutFileClause { RESULT_COL_TYPES.add(PrimitiveType.BIGINT); RESULT_COL_TYPES.add(PrimitiveType.BIGINT); RESULT_COL_TYPES.add(PrimitiveType.VARCHAR); + + PARQUET_REPETITION_TYPES.add("required"); + PARQUET_REPETITION_TYPES.add("repeated"); + PARQUET_REPETITION_TYPES.add("optional"); + + PARQUET_DATA_TYPES.add("boolean"); + PARQUET_DATA_TYPES.add("int32"); + PARQUET_DATA_TYPES.add("int64"); + PARQUET_DATA_TYPES.add("int96"); + PARQUET_DATA_TYPES.add("byte_array"); + PARQUET_DATA_TYPES.add("float"); + PARQUET_DATA_TYPES.add("double"); + PARQUET_DATA_TYPES.add("fixed_len_byte_array"); } public static final String LOCAL_FILE_PREFIX = "file:///"; @@ -66,11 +82,14 @@ public class OutFileClause { private static final String PROP_LINE_DELIMITER = "line_delimiter"; private static final String PROP_MAX_FILE_SIZE = "max_file_size"; private static final String PROP_SUCCESS_FILE_NAME = "success_file_name"; + private static final String PARQUET_PROP_PREFIX = "parquet."; + private static final String SCHEMA = "schema"; private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 1024; // 1GB private static final long MIN_FILE_SIZE_BYTES = 5 * 1024 * 1024L; // 5MB private static final long MAX_FILE_SIZE_BYTES = 2 * 1024 * 1024 * 1024L; // 2GB + private String filePath; private String format; private Map properties; @@ -85,6 +104,8 @@ public class OutFileClause { // If set to true, the brokerDesc must be null. private boolean isLocalOutput = false; private String successFileName = ""; + private List> schema = new ArrayList<>(); + private Map fileProperties = new HashMap<>(); public OutFileClause(String filePath, String format, Map properties) { this.filePath = filePath; @@ -121,10 +142,19 @@ public BrokerDesc getBrokerDesc() { public void analyze(Analyzer analyzer) throws AnalysisException { analyzeFilePath(); - if (!format.equals("csv")) { - throw new AnalysisException("Only support CSV format"); + if (Strings.isNullOrEmpty(filePath)) { + throw new AnalysisException("Must specify file in OUTFILE clause"); + } + switch (this.format) { + case "csv": + fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; + break; + case "parquet": + fileFormatType = TFileFormatType.FORMAT_PARQUET; + break; + default: + throw new AnalysisException("format:" + this.format + " is not supported."); } - fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; analyzeProperties(); @@ -135,6 +165,16 @@ public void analyze(Analyzer analyzer) throws AnalysisException { } } + public void analyze(Analyzer analyzer, SelectStmt stmt) throws AnalysisException { + analyze(analyzer); + List items = stmt.getSelectList().getItems(); + for (SelectListItem item:items) { + if (item.getExpr().getType() == Type.LARGEINT && isParquetFormat()) { + throw new AnalysisException("currently parquet do not support largeint type"); + } + } + } + private void analyzeFilePath() throws AnalysisException { if (Strings.isNullOrEmpty(filePath)) { throw new AnalysisException("Must specify file in OUTFILE clause"); @@ -166,7 +206,7 @@ private void analyzeProperties() throws AnalysisException { columnSeparator = properties.get(PROP_COLUMN_SEPARATOR); processedPropKeys.add(PROP_COLUMN_SEPARATOR); } - + if (properties.containsKey(PROP_LINE_DELIMITER)) { if (!isCsvFormat()) { throw new AnalysisException(PROP_LINE_DELIMITER + " is only for CSV format"); @@ -189,11 +229,16 @@ private void analyzeProperties() throws AnalysisException { processedPropKeys.add(PROP_SUCCESS_FILE_NAME); } + if (this.fileFormatType == TFileFormatType.FORMAT_PARQUET) { + getParquetProperties(processedPropKeys); + } + if (processedPropKeys.size() != properties.size()) { LOG.debug("{} vs {}", processedPropKeys, properties); throw new AnalysisException("Unknown properties: " + properties.keySet().stream() .filter(k -> !processedPropKeys.contains(k)).collect(Collectors.toList())); } + } private void getBrokerProperties(Set processedPropKeys) { @@ -202,7 +247,7 @@ private void getBrokerProperties(Set processedPropKeys) { } String brokerName = properties.get(PROP_BROKER_NAME); processedPropKeys.add(PROP_BROKER_NAME); - + Map brokerProps = Maps.newHashMap(); Iterator> iter = properties.entrySet().iterator(); while (iter.hasNext()) { @@ -216,6 +261,54 @@ private void getBrokerProperties(Set processedPropKeys) { brokerDesc = new BrokerDesc(brokerName, brokerProps); } + /** + * example: + * SELECT citycode FROM table1 INTO OUTFILE "file:///root/doris/" + * FORMAT AS PARQUET PROPERTIES ("schema"="required,int32,siteid;", "parquet.compression"="snappy"); + * + * schema: it defined the schema of parquet file, it consists of 3 field: competition type, data type, column name + * multiple columns is split by `;` + * + * prefix with 'parquet.' defines the properties of parquet file, + * currently only supports: compression, disable_dictionary, version + */ + private void getParquetProperties(Set processedPropKeys) throws AnalysisException { + String schema = properties.get(SCHEMA); + if (schema == null || schema.length() <= 0) { + throw new AnalysisException("schema is required for parquet file"); + } + schema = schema.replace(" ",""); + schema = schema.toLowerCase(); + String[] schemas = schema.split(";"); + for (String item:schemas) { + String[] properties = item.split(","); + if (properties.length != 3) { + throw new AnalysisException("must only contains repetition type/column type/column name"); + } + if (!PARQUET_REPETITION_TYPES.contains(properties[0])) { + throw new AnalysisException("unknown repetition type"); + } + if (!properties[0].equalsIgnoreCase("required")) { + throw new AnalysisException("currently only support required type"); + } + if (!PARQUET_DATA_TYPES.contains(properties[1])) { + throw new AnalysisException("data type is not supported:"+properties[1]); + } + List column = new ArrayList<>(); + column.addAll(Arrays.asList(properties)); + this.schema.add(column); + } + processedPropKeys.add(SCHEMA); + Iterator> iter = properties.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) { + processedPropKeys.add(entry.getKey()); + fileProperties.put(entry.getKey().substring(PARQUET_PROP_PREFIX.length()), entry.getValue()); + } + } + } + private boolean isCsvFormat() { return fileFormatType == TFileFormatType.FORMAT_CSV_BZ2 || fileFormatType == TFileFormatType.FORMAT_CSV_DEFLATE @@ -226,6 +319,10 @@ private boolean isCsvFormat() { || fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN; } + private boolean isParquetFormat() { + return fileFormatType == TFileFormatType.FORMAT_PARQUET; + } + @Override public OutFileClause clone() { return new OutFileClause(this); @@ -257,6 +354,10 @@ public TResultFileSinkOptions toSinkOptions() { if (!Strings.isNullOrEmpty(successFileName)) { sinkOptions.setSuccessFileName(successFileName); } + if (isParquetFormat()) { + sinkOptions.setSchema(this.schema); + sinkOptions.setFileProperties(this.fileProperties); + } return sinkOptions; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java index 4766562386cd4d..3460cd513c6a9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java @@ -23,14 +23,11 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.rewrite.ExprRewriter; - import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; - import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -161,7 +158,6 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); analyzeLimit(analyzer); if (hasWithClause()) withClause_.analyze(analyzer); - if (hasOutFileClause()) outFileClause.analyze(analyzer); } private void analyzeLimit(Analyzer analyzer) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index af0f83df00dec8..6c2b466566978d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -529,6 +529,9 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { LOG.debug("post-analysis " + aggInfo.debugString()); } } + if (hasOutFileClause()) { + outFileClause.analyze(analyzer, this); + } } public List getTableRefIds() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java index 97e36a7fbdeb68..728e9944686f4a 100755 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java @@ -17,6 +17,8 @@ package org.apache.doris.analysis; +import mockit.Mock; +import mockit.MockUp; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.util.Util; @@ -27,7 +29,6 @@ import org.apache.doris.utframe.UtFrameUtils; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -38,8 +39,6 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import mockit.Mock; -import mockit.MockUp; public class SelectStmtTest { private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/"; @@ -58,8 +57,8 @@ public static void setUp() throws Exception { Config.enable_batch_delete_by_default = true; Config.enable_http_server_v2 = false; UtFrameUtils.createMinDorisCluster(runningDir); - String createTblStmtStr = "create table db1.tbl1(k1 varchar(32), k2 varchar(32), k3 varchar(32), k4 int) " - + "AGGREGATE KEY(k1, k2,k3,k4) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"; + String createTblStmtStr = "create table db1.tbl1(k1 varchar(32), k2 varchar(32), k3 varchar(32), k4 int, k5 largeint) " + + "AGGREGATE KEY(k1, k2,k3,k4,k5) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"; String createBaseAllStmtStr = "create table db1.baseall(k1 int, k2 varchar(32)) distributed by hash(k1) " + "buckets 3 properties('replication_num' = '1');"; String createPratitionTableStr = "CREATE TABLE db1.partition_table (\n" + @@ -580,4 +579,68 @@ public void testGetTableRefs() throws Exception { Assert.assertEquals("table1", tblRefs.get(0).getName().getTbl()); Assert.assertEquals("table2", tblRefs.get(1).getName().getTbl()); } + + @Test + public void testOutfile() throws Exception { + ConnectContext ctx = UtFrameUtils.createDefaultCtx(); + Config.enable_outfile_to_local = true; + String sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int32,siteid;required,byte_array,username;\");"; + dorisAssert.query(sql).explainQuery(); + // must contains schema + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET;"; + try { + SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("schema is required for parquet file")); + } + // schema must contains 3 fields + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"int32,siteid;\");"; + try { + SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("must only contains repetition type/column type/column name")); + } + // unknown repetition type + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"repeat, int32,siteid;\");"; + try { + SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("unknown repetition type")); + } + // only support required type + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"repeated,int32,siteid;\");"; + try { + SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("currently only support required type")); + } + // unknown data type + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int128,siteid;\");"; + try { + SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("data type is not supported")); + } + // contains parquet properties + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int32,siteid;\", 'parquet.compression'='snappy');"; + dorisAssert.query(sql).explainQuery(); + // support parquet for broker + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");"; + + // do not support large int type + try { + sql = "SELECT k5 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");"; + SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("currently parquet do not support largeint type")); + } + + // do not support large int type, contains function + try { + sql = "SELECT sum(k5) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");"; + SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("currently parquet do not support largeint type")); + } + } } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index c3ebdf0256863c..b4ff8e1d993c9d 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -49,6 +49,8 @@ struct TResultFileSinkOptions { 6: optional list broker_addresses; // only for remote file 7: optional map broker_properties // only for remote file 8: optional string success_file_name + 9: optional list> schema // for parquet/orc file + 10: optional map file_properties // for parquet/orc file } struct TMemoryScratchSink {