Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ struct AuthInfo {
std::string user_ip;
// -1 as unset
int64_t auth_code = -1;
std::string auth_code_uuid = "";
};

template <class T>
Expand All @@ -39,6 +40,9 @@ void set_request_auth(T* req, const AuthInfo& auth) {
// so they have to be set.
req->user = "";
req->passwd = "";
} else if (auth.auth_code_uuid != "") {
req->__isset.auth_code_uuid = true;
req->auth_code_uuid = auth.auth_code_uuid;
} else {
req->user = auth.user;
req->passwd = auth.passwd;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ set(EXEC_FILES
olap_common.cpp
tablet_info.cpp
tablet_sink.cpp
plain_binary_line_reader.cpp
plain_text_line_reader.cpp
csv_scan_node.cpp
csv_scanner.cpp
Expand Down
93 changes: 60 additions & 33 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "exec/decompressor.h"
#include "exec/exec_node.h"
#include "exec/local_file_reader.h"
#include "exec/plain_binary_line_reader.h"
#include "exec/plain_text_line_reader.h"
#include "exec/s3_reader.h"
#include "exec/text_converter.h"
Expand Down Expand Up @@ -222,6 +223,7 @@ Status BrokerScanner::create_decompressor(TFileFormatType::type type) {
switch (type) {
case TFileFormatType::FORMAT_CSV_PLAIN:
case TFileFormatType::FORMAT_JSON:
case TFileFormatType::FORMAT_PROTO:
compress_type = CompressType::UNCOMPRESSED;
break;
case TFileFormatType::FORMAT_CSV_GZ:
Expand Down Expand Up @@ -279,6 +281,7 @@ Status BrokerScanner::open_line_reader() {
// _decompressor may be NULL if this is not a compressed file
RETURN_IF_ERROR(create_decompressor(range.format_type));

_file_format_type = range.format_type;
// open line reader
switch (range.format_type) {
case TFileFormatType::FORMAT_CSV_PLAIN:
Expand All @@ -290,6 +293,9 @@ Status BrokerScanner::open_line_reader() {
_cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, _cur_decompressor,
size, _line_delimiter, _line_delimiter_length);
break;
case TFileFormatType::FORMAT_PROTO:
_cur_line_reader = new PlainBinaryLineReader(_cur_file_reader);
break;
default: {
std::stringstream ss;
ss << "Unknown format type, cannot init line reader, type=" << range.format_type;
Expand Down Expand Up @@ -326,40 +332,53 @@ void BrokerScanner::close() {

void BrokerScanner::split_line(const Slice& line) {
_split_values.clear();
const char* value = line.data;
size_t start = 0; // point to the start pos of next col value.
size_t curpos = 0; // point to the start pos of separator matching sequence.
size_t p1 = 0; // point to the current pos of separator matching sequence.

// Separator: AAAA
//
// curpos
// ▼
// AAAA
// 1000AAAA2000AAAA
// ▲ ▲
// Start │
// p1

while (curpos < line.size) {
if (*(value + curpos + p1) != _value_separator[p1]) {
// Not match, move forward:
curpos += (p1 == 0 ? 1 : p1);
p1 = 0;
} else {
p1++;
if (p1 == _value_separator_length) {
// Match a separator
_split_values.emplace_back(value + start, curpos - start);
start = curpos + _value_separator_length;
curpos = start;
if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
PDataRow** ptr = reinterpret_cast<PDataRow**>(line.data);
PDataRow* row = *ptr;
for (const PDataColumn& col : (row)->col()) {
int len = col.value().size();
uint8_t* buf = new uint8_t[len];
memcpy(buf, col.value().c_str(), len);
_split_values.emplace_back(buf, len);
}
delete row;
delete ptr;
} else {
const char *value = line.data;
size_t start = 0; // point to the start pos of next col value.
size_t curpos = 0; // point to the start pos of separator matching sequence.
size_t p1 = 0; // point to the current pos of separator matching sequence.

// Separator: AAAA
//
// curpos
// ▼
// AAAA
// 1000AAAA2000AAAA
// ▲ ▲
// Start │
// p1

while (curpos < line.size) {
if (*(value + curpos + p1) != _value_separator[p1]) {
// Not match, move forward:
curpos += (p1 == 0 ? 1 : p1);
p1 = 0;
} else {
p1++;
if (p1 == _value_separator_length) {
// Match a separator
_split_values.emplace_back(value + start, curpos - start);
start = curpos + _value_separator_length;
curpos = start;
p1 = 0;
}
}
}
}

CHECK(curpos == line.size) << curpos << " vs " << line.size;
_split_values.emplace_back(value + start, curpos - start);
CHECK(curpos == line.size) << curpos << " vs " << line.size;
_split_values.emplace_back(value + start, curpos - start);
}
}

void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p,
Expand Down Expand Up @@ -454,7 +473,7 @@ bool BrokerScanner::convert_one_row(const Slice& line, Tuple* tuple, MemPool* tu

// Convert one row to this tuple
bool BrokerScanner::line_to_src_tuple(const Slice& line) {
if (!validate_utf8(line.data, line.size)) {
if (_file_format_type != TFileFormatType::FORMAT_PROTO && !validate_utf8(line.data, line.size)) {
std::stringstream error_msg;
error_msg << "data is not encoded by UTF-8";
_state->append_error_msg_to_file("Unable to display", error_msg.str());
Expand All @@ -474,7 +493,11 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
<< _value_separator << "], "
<< "line delimiter: [" << _line_delimiter << "], "
<< "schema number: " << _src_slot_descs.size() << "; ";
_state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
_state->append_error_msg_to_file("", error_msg.str());
} else {
_state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
}
_counter->num_rows_filtered++;
return false;
} else if (_split_values.size() + columns_from_path.size() > _src_slot_descs.size()) {
Expand All @@ -484,7 +507,11 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
<< _value_separator << "], "
<< "line delimiter: [" << _line_delimiter << "], "
<< "schema number: " << _src_slot_descs.size() << "; ";
_state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
_state->append_error_msg_to_file("", error_msg.str());
} else {
_state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
}
_counter->num_rows_filtered++;
return false;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "exec/base_scanner.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "runtime/mem_pool.h"
#include "util/runtime_profile.h"
#include "util/slice.h"
Expand Down Expand Up @@ -99,6 +100,7 @@ class BrokerScanner : public BaseScanner {

std::string _value_separator;
std::string _line_delimiter;
TFileFormatType::type _file_format_type;
int _value_separator_length;
int _line_delimiter_length;

Expand Down
48 changes: 48 additions & 0 deletions be/src/exec/plain_binary_line_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/plain_binary_line_reader.h"

#include "common/status.h"
#include "exec/file_reader.h"

namespace doris {

PlainBinaryLineReader::PlainBinaryLineReader(FileReader* file_reader)
: _file_reader(file_reader) {
}

PlainBinaryLineReader::~PlainBinaryLineReader() {
close();
}

void PlainBinaryLineReader::close() {
}

Status PlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) {
std::unique_ptr<uint8_t[]> file_buf;
int64_t read_size = 0;
RETURN_IF_ERROR(_file_reader->read_one_message(&file_buf, &read_size));
*ptr = file_buf.release();
*size = read_size;
if (read_size == 0) {
*eof = true;
}
return Status::OK();
}

} // namespace doris
40 changes: 40 additions & 0 deletions be/src/exec/plain_binary_line_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "exec/line_reader.h"

namespace doris {

class FileReader;

class PlainBinaryLineReader : public LineReader {
public:
PlainBinaryLineReader(FileReader* file_reader);

virtual ~PlainBinaryLineReader();

virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override;

virtual void close() override;

private:
FileReader* _file_reader;
};

} // namespace doris
72 changes: 71 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
#include "runtime/exec_env.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
Expand Down Expand Up @@ -136,6 +139,9 @@ class FragmentExecState {

std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return _fragments_ctx; }

void set_pipe(std::shared_ptr<StreamLoadPipe> pipe) { _pipe = pipe; }
std::shared_ptr<StreamLoadPipe> get_pipe() const { return _pipe; }

private:
void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done);

Expand Down Expand Up @@ -166,6 +172,8 @@ class FragmentExecState {
std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;

std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
// The pipe for data transfering, such as insert.
std::shared_ptr<StreamLoadPipe> _pipe;
};

FragmentExecState::FragmentExecState(const TUniqueId& query_id,
Expand Down Expand Up @@ -239,6 +247,9 @@ Status FragmentExecState::cancel_before_execute() {
// set status as 'abort', cuz cancel() won't effect the status arg of DataSink::close().
_executor.set_abort();
_executor.cancel();
if (_pipe != nullptr) {
_pipe->cancel();
}
return Status::OK();
}

Expand All @@ -249,6 +260,9 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) {
_executor.set_is_report_on_cancel(false);
}
_executor.cancel();
if (_pipe != nullptr) {
_pipe->cancel();
}
return Status::OK();
}

Expand Down Expand Up @@ -457,7 +471,63 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
}

Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1));
if (params.txn_conf.need_txn) {
StreamLoadContext* stream_load_cxt = new StreamLoadContext(_exec_env);
stream_load_cxt->db = params.txn_conf.db;
stream_load_cxt->db_id = params.txn_conf.db_id;
stream_load_cxt->table = params.txn_conf.tbl;
stream_load_cxt->txn_id = params.txn_conf.txn_id;
stream_load_cxt->id = UniqueId(params.params.query_id);
stream_load_cxt->put_result.params = params;
stream_load_cxt->use_streaming = true;
stream_load_cxt->load_type = TLoadType::MANUL_LOAD;
stream_load_cxt->load_src_type = TLoadSourceType::RAW;
stream_load_cxt->label = params.import_label;
stream_load_cxt->format = TFileFormatType::FORMAT_CSV_PLAIN;
stream_load_cxt->timeout_second = 3600;
stream_load_cxt->auth.auth_code_uuid = params.txn_conf.auth_code_uuid;
stream_load_cxt->need_commit_self = true;
stream_load_cxt->need_rollback = true;
// total_length == -1 means read one message from pipe in once time, don't care the length.
auto pipe = std::make_shared<StreamLoadPipe>(
1024 * 1024 /* max_buffered_bytes */,
64 * 1024 /* min_chunk_size */,
-1 /* total_length */,
true /* use_proto */);
stream_load_cxt->body_sink = pipe;
stream_load_cxt->max_filter_ratio = params.txn_conf.max_filter_ratio;

RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_cxt->id, pipe));

RETURN_IF_ERROR(
_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt, pipe));
set_pipe(params.params.fragment_instance_id, pipe);
return Status::OK();
} else {
return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1));
}
}

void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<StreamLoadPipe> pipe) {
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
if (iter != _fragment_map.end()) {
_fragment_map[fragment_instance_id]->set_pipe(pipe);
}
}
}

std::shared_ptr<StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& fragment_instance_id) {
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
if (iter != _fragment_map.end()) {
return _fragment_map[fragment_instance_id]->get_pipe();
} else {
return nullptr;
}
}
}

Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) {
Expand Down
Loading