Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 104 additions & 9 deletions be/src/runtime/file_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "runtime/buffer_control_block.h"
#include "runtime/primitive_type.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple_row.h"
#include "service/backend_options.h"
#include "util/date_func.h"
#include "util/file_utils.h"
#include "util/mysql_row_buffer.h"
#include "util/types.h"
#include "util/uid_util.h"

Expand All @@ -37,10 +41,12 @@ const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;

FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts,
const std::vector<ExprContext*>& output_expr_ctxs,
RuntimeProfile* parent_profile)
RuntimeProfile* parent_profile,
BufferControlBlock* sinker)
: _file_opts(file_opts),
_output_expr_ctxs(output_expr_ctxs),
_parent_profile(parent_profile) {}
_parent_profile(parent_profile),
_sinker(sinker) {}

FileResultWriter::~FileResultWriter() {
_close_file_writer(true);
Expand All @@ -50,7 +56,7 @@ Status FileResultWriter::init(RuntimeState* state) {
_state = state;
_init_profile();

RETURN_IF_ERROR(_create_file_writer());
RETURN_IF_ERROR(_create_next_file_writer());
return Status::OK();
}

Expand All @@ -64,8 +70,40 @@ void FileResultWriter::_init_profile() {
_written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES);
}

Status FileResultWriter::_create_file_writer() {
std::string file_name = _get_next_file_name();
Status FileResultWriter::_create_success_file() {
std::string file_name;
RETURN_IF_ERROR(_get_success_file_name(&file_name));
RETURN_IF_ERROR(_create_file_writer(file_name));
RETURN_IF_ERROR(_close_file_writer(true, true));
return Status::OK();
}

Status FileResultWriter::_get_success_file_name(std::string* file_name) {
std::stringstream ss;
ss << _file_opts->file_path << _file_opts->success_file_name;
*file_name = ss.str();
if (_file_opts->is_local_file) {
// For local file writer, the file_path is a local dir.
// Here we do a simple security verification by checking whether the file exists.
// Because the file path is currently arbitrarily specified by the user,
// Doris is not responsible for ensuring the correctness of the path.
// This is just to prevent overwriting the existing file.
if (FileUtils::check_exist(*file_name)) {
return Status::InternalError("File already exists: " + *file_name
+ ". Host: " + BackendOptions::get_localhost());
}
}

return Status::OK();
}

Status FileResultWriter::_create_next_file_writer() {
std::string file_name;
RETURN_IF_ERROR(_get_next_file_name(&file_name));
return _create_file_writer(file_name);
}

Status FileResultWriter::_create_file_writer(const std::string& file_name) {
if (_file_opts->is_local_file) {
_file_writer = new LocalFileWriter(file_name, 0 /* start offset */);
} else {
Expand All @@ -92,10 +130,23 @@ Status FileResultWriter::_create_file_writer() {
}

// file name format as: my_prefix_0.csv
std::string FileResultWriter::_get_next_file_name() {
Status FileResultWriter::_get_next_file_name(std::string* file_name) {
std::stringstream ss;
ss << _file_opts->file_path << (_file_idx++) << "." << _file_format_to_name();
return ss.str();
*file_name = ss.str();
if (_file_opts->is_local_file) {
// For local file writer, the file_path is a local dir.
// Here we do a simple security verification by checking whether the file exists.
// Because the file path is currently arbitrarily specified by the user,
// Doris is not responsible for ensuring the correctness of the path.
// This is just to prevent overwriting the existing file.
if (FileUtils::check_exist(*file_name)) {
return Status::InternalError("File already exists: " + *file_name
+ ". Host: " + BackendOptions::get_localhost());
}
}

return Status::OK();
}

std::string FileResultWriter::_file_format_to_name() {
Expand Down Expand Up @@ -291,7 +342,7 @@ Status FileResultWriter::_create_new_file_if_exceed_size() {
return Status::OK();
}

Status FileResultWriter::_close_file_writer(bool done) {
Status FileResultWriter::_close_file_writer(bool done, bool only_close) {
if (_parquet_writer != nullptr) {
_parquet_writer->close();
delete _parquet_writer;
Expand All @@ -305,13 +356,57 @@ Status FileResultWriter::_close_file_writer(bool done) {
_file_writer = nullptr;
}

if (only_close) {
return Status::OK();
}

if (!done) {
// not finished, create new file writer for next file
RETURN_IF_ERROR(_create_file_writer());
RETURN_IF_ERROR(_create_next_file_writer());
} else {
// All data is written to file, send statistic result
if (_file_opts->success_file_name != "") {
// write success file, just need to touch an empty file
RETURN_IF_ERROR(_create_success_file());
}
RETURN_IF_ERROR(_send_result());
}
return Status::OK();
}

Status FileResultWriter::_send_result() {
if (_is_result_sent) {
return Status::OK();
}
_is_result_sent = true;

// The final stat result include:
// FileNumber, TotalRows, FileSize and URL
// The type of these field should be conssitent with types defined
// in OutFileClause.java of FE.
MysqlRowBuffer row_buffer;
row_buffer.push_int(_file_idx); // file number
row_buffer.push_bigint(_written_rows_counter->value()); // total rows
row_buffer.push_bigint(_written_data_bytes->value()); // file size
std::string localhost = BackendOptions::get_localhost();
row_buffer.push_string(localhost.c_str(), localhost.length()); // url

TFetchDataResult* result = new (std::nothrow) TFetchDataResult();
result->result_batch.rows.resize(1);
result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());

Status st = _sinker->add_batch(result);
if (st.ok()) {
result = nullptr;
} else {
LOG(WARNING) << "failed to send outfile result: " << st.get_error_msg();
}

delete result;
result = nullptr;
return st;
}

Status FileResultWriter::close() {
// the following 2 profile "_written_rows_counter" and "_writer_close_timer"
// must be outside the `_close_file_writer()`.
Expand Down
29 changes: 24 additions & 5 deletions be/src/runtime/file_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct ResultFileOptions {
size_t max_file_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB
std::vector<TNetworkAddress> broker_addresses;
std::map<std::string, std::string> broker_properties;
std::string success_file_name = "";

ResultFileOptions(const TResultFileSinkOptions& t_opt) {
file_path = t_opt.file_path;
Expand All @@ -56,21 +57,29 @@ struct ResultFileOptions {
if (t_opt.__isset.broker_properties) {
broker_properties = t_opt.broker_properties;
}
if (t_opt.__isset.success_file_name) {
success_file_name = t_opt.success_file_name;
}
}
};

class BufferControlBlock;
// write result to file
class FileResultWriter final : public ResultWriter {
public:
FileResultWriter(const ResultFileOptions* file_option,
const std::vector<ExprContext*>& output_expr_ctxs,
RuntimeProfile* parent_profile);
RuntimeProfile* parent_profile,
BufferControlBlock* sinker);
virtual ~FileResultWriter();

virtual Status init(RuntimeState* state) override;
virtual Status append_row_batch(const RowBatch* batch) override;
virtual Status close() override;

// file result writer always return statistic result in one row
virtual int64_t get_written_rows() const { return 1; }

private:
Status _write_csv_file(const RowBatch& batch);
Status _write_one_row_as_csv(TupleRow* row);
Expand All @@ -80,14 +89,20 @@ class FileResultWriter final : public ResultWriter {
Status _flush_plain_text_outstream(bool eos);
void _init_profile();

Status _create_file_writer();
Status _create_file_writer(const std::string& file_name);
Status _create_next_file_writer();
Status _create_success_file();
// get next export file name
std::string _get_next_file_name();
Status _get_next_file_name(std::string* file_name);
Status _get_success_file_name(std::string* file_name);
std::string _file_format_to_name();
// close file writer, and if !done, it will create new writer for next file
Status _close_file_writer(bool done);
// close file writer, and if !done, it will create new writer for next file.
// if only_close is true, this method will just close the file writer and return.
Status _close_file_writer(bool done, bool only_close = false);
// create a new file if current file size exceed limit
Status _create_new_file_if_exceed_size();
// send the final statistic result
Status _send_result();

private:
RuntimeState* _state; // not owned, set when init
Expand Down Expand Up @@ -126,6 +141,10 @@ class FileResultWriter final : public ResultWriter {
RuntimeProfile::Counter* _written_rows_counter = nullptr;
// bytes of written data
RuntimeProfile::Counter* _written_data_bytes = nullptr;

BufferControlBlock* _sinker;
// set to true if the final statistic result is sent
bool _is_result_sent = false;
};

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/runtime/result_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Status ResultSink::prepare(RuntimeState* state) {
case TResultSinkType::FILE:
CHECK(_file_opts.get() != nullptr);
_writer.reset(new (std::nothrow)
FileResultWriter(_file_opts.get(), _output_expr_ctxs, _profile));
FileResultWriter(_file_opts.get(), _output_expr_ctxs, _profile, _sender.get()));
break;
default:
return Status::InternalError("Unknown result sink type");
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ResultWriter {

virtual Status close() = 0;

int64_t get_written_rows() const { return _written_rows; }
virtual int64_t get_written_rows() const { return _written_rows; }

static const std::string NULL_IN_CSV;

Expand Down
31 changes: 22 additions & 9 deletions docs/en/administrator-guide/outfile.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,22 @@ WITH BROKER `broker_name`
4. Example 4

Export simple query results to the file `cos://${bucket_name}/path/result.txt`. Specify the export format as CSV.
And create a mark file after export finished.

```
select k1,k2,v1 from tbl1 limit 100000
into outfile "s3a://my_bucket/export/my_file_"
FORMAT AS CSV
PROPERTIES
(
select k1,k2,v1 from tbl1 limit 100000
into outfile "s3a://my_bucket/export/my_file_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "hdfs_broker",
"broker.fs.s3a.access.key" = "xxx",
"broker.fs.s3a.secret.key" = "xxxx",
"broker.fs.s3a.endpoint" = "https://cos.xxxxxx.myqcloud.com/",
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "1024MB"
"max_file_size" = "1024MB",
"success_file_name" = "SUCCESS"
)
```

Expand All @@ -188,19 +190,30 @@ WITH BROKER `broker_name`
## Return result

The command is a synchronization command. The command returns, which means the operation is over.
At the same time, a row of results will be returned to show the exported execution result.

If it exports and returns normally, the result is as follows:

```
mysql> SELECT * FROM tbl INTO OUTFILE ... Query OK, 100000 row affected (5.86 sec)
mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_";
+------------+-----------+----------+--------------+
| FileNumber | TotalRows | FileSize | URL |
+------------+-----------+----------+--------------+
| 1 | 2 | 8 | 192.168.1.10 |
+------------+-----------+----------+--------------+
1 row in set (0.05 sec)
```

`100000 row affected` Indicates the size of the exported result set.
* FileNumber: The number of files finally generated.
* TotalRows: The number of rows in the result set.
* FileSize: The total size of the exported file. Unit byte.
* URL: If it is exported to a local disk, the Compute Node to which it is exported is displayed here.

If the execution is incorrect, an error message will be returned, such as:

```
mysql> SELECT * FROM tbl INTO OUTFILE ... ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ...
mysql> SELECT * FROM tbl INTO OUTFILE ...
ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ...
```

## Notice
Expand Down
Loading