Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
389 changes: 379 additions & 10 deletions be/src/exec/parquet_writer.cpp

Large diffs are not rendered by default.

39 changes: 33 additions & 6 deletions be/src/exec/parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,17 @@
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
#include "runtime/tuple.h"
#include "runtime/row_batch.h"
#include "exprs/expr_context.h"

namespace doris {

class ExprContext;
class FileWriter;
class RowBatch;

class ParquetOutputStream : public arrow::io::OutputStream {
public:
ParquetOutputStream(FileWriter* file_writer);
ParquetOutputStream(FileWriter* file_writer, const int64_t& written_len);
virtual ~ParquetOutputStream();

arrow::Status Write(const void* data, int64_t nbytes) override;
Expand All @@ -55,26 +56,52 @@ class ParquetOutputStream : public arrow::io::OutputStream {

bool closed() const override { return _is_closed; }

int64_t get_written_len();

void set_written_len(int64_t written_len);

private:
FileWriter* _file_writer; // not owned
int64_t _cur_pos; // current write position
int64_t _cur_pos = 0; // current write position
bool _is_closed = false;
int64_t _written_len = 0;
};

// a wrapper of parquet output stream
class ParquetWriterWrapper {
public:
ParquetWriterWrapper(FileWriter* file_writer,
const std::vector<ExprContext*>& output_expr_ctxs);
const std::vector<ExprContext*>& output_expr_ctxs,
const std::map<std::string, std::string>& properties,
const std::vector<std::vector<std::string>>& schema);
virtual ~ParquetWriterWrapper();

Status write(const RowBatch& row_batch);

Status init_parquet_writer();

Status _write_one_row(TupleRow* row);

void close();

void parse_properties(const std::map<std::string, std::string>& propertie_map);

Status parse_schema(const std::vector<std::vector<std::string>>& schema);

parquet::RowGroupWriter* get_rg_writer();

int64_t written_len();

private:
ParquetOutputStream* _outstream;
std::shared_ptr<ParquetOutputStream> _outstream;
std::shared_ptr<parquet::WriterProperties> _properties;
std::shared_ptr<parquet::schema::GroupNode> _schema;
std::unique_ptr<parquet::ParquetFileWriter> _writer;
const std::vector<ExprContext*>& _output_expr_ctxs;
std::vector<std::vector<std::string>> _str_schema;
int64_t _cur_writed_rows = 0;
parquet::RowGroupWriter* _rg_writer;
const int64_t _max_row_per_group = 10;
};

} // namespace doris
22 changes: 15 additions & 7 deletions be/src/runtime/file_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ Status FileResultWriter::_create_file_writer(const std::string& file_name) {
_file_opts->broker_properties, file_name, 0 /*start offset*/);
}
RETURN_IF_ERROR(_file_writer->open());

switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
// just use file writer is enough
break;
case TFileFormatType::FORMAT_PARQUET:
_parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs);
_parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs,
_file_opts->file_properties, _file_opts->schema);
break;
default:
return Status::InternalError(
strings::Substitute("unsupported file format: $0", _file_opts->file_format));
}
LOG(INFO) << "create file for exporting query result. file name: " << file_name
<< ". query id: " << print_id(_state->query_id());
<< ". query id: " << print_id(_state->query_id()) << " format:" << _file_opts->file_format;
return Status::OK();
}

Expand Down Expand Up @@ -167,7 +167,7 @@ Status FileResultWriter::append_row_batch(const RowBatch* batch) {

SCOPED_TIMER(_append_row_batch_timer);
if (_parquet_writer != nullptr) {
RETURN_IF_ERROR(_parquet_writer->write(*batch));
RETURN_IF_ERROR(_write_parquet_file(*batch));
} else {
RETURN_IF_ERROR(_write_csv_file(*batch));
}
Expand All @@ -176,6 +176,13 @@ Status FileResultWriter::append_row_batch(const RowBatch* batch) {
return Status::OK();
}

Status FileResultWriter::_write_parquet_file(const RowBatch& batch) {
RETURN_IF_ERROR(_parquet_writer->write(batch));
// split file if exceed limit
RETURN_IF_ERROR(_create_new_file_if_exceed_size());
return Status::OK();
}

Status FileResultWriter::_write_csv_file(const RowBatch& batch) {
int num_rows = batch.num_rows();
for (int i = 0; i < num_rows; ++i) {
Expand Down Expand Up @@ -345,11 +352,12 @@ Status FileResultWriter::_create_new_file_if_exceed_size() {
Status FileResultWriter::_close_file_writer(bool done, bool only_close) {
if (_parquet_writer != nullptr) {
_parquet_writer->close();
_current_written_bytes = _parquet_writer->written_len();
COUNTER_UPDATE(_written_data_bytes, _current_written_bytes);
delete _parquet_writer;
_parquet_writer = nullptr;
if (!done) {
//TODO(cmy): implement parquet writer later
}
delete _file_writer;
_file_writer = nullptr;
} else if (_file_writer != nullptr) {
_file_writer->close();
delete _file_writer;
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/file_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ struct ResultFileOptions {
std::vector<TNetworkAddress> broker_addresses;
std::map<std::string, std::string> broker_properties;
std::string success_file_name = "";
std::vector<std::vector<std::string>> schema;
std::map<std::string, std::string> file_properties;

ResultFileOptions(const TResultFileSinkOptions& t_opt) {
file_path = t_opt.file_path;
Expand All @@ -60,6 +62,12 @@ struct ResultFileOptions {
if (t_opt.__isset.success_file_name) {
success_file_name = t_opt.success_file_name;
}
if (t_opt.__isset.schema) {
schema = t_opt.schema;
}
if (t_opt.__isset.file_properties) {
file_properties = t_opt.file_properties;
}
}
};

Expand All @@ -82,6 +90,7 @@ class FileResultWriter final : public ResultWriter {

private:
Status _write_csv_file(const RowBatch& batch);
Status _write_parquet_file(const RowBatch& batch);
Status _write_one_row_as_csv(TupleRow* row);

// if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
Expand Down
Loading