Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTa
break;
}
if (task.push_req.push_type == TPushType::LOAD
|| task.push_req.push_type == TPushType::LOAD_DELETE) {
|| task.push_req.push_type == TPushType::LOAD_DELETE
|| task.push_req.push_type == TPushType::LOAD_V2) {
_push_workers->submit_task(task);
} else if (task.push_req.push_type == TPushType::DELETE) {
_delete_workers->submit_task(task);
Expand Down
21 changes: 11 additions & 10 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) {
#endif

LOG(INFO) << "get push task. signature: " << agent_task_req.signature
<< " priority: " << priority;
<< " priority: " << priority << " push_type: " << push_req.push_type;
vector<TTabletInfo> tablet_infos;

EngineBatchLoadTask engine_task(push_req, &tablet_infos, agent_task_req.signature, &status);
Expand All @@ -645,7 +645,8 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) {
}

if (status == DORIS_SUCCESS) {
VLOG(3) << "push ok.signature: " << agent_task_req.signature;
VLOG(3) << "push ok. signature: " << agent_task_req.signature
<< ", push_type: " << push_req.push_type;
error_msgs.push_back("push success");

++_s_report_version;
Expand Down Expand Up @@ -1104,14 +1105,14 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
disk.__set_used(root_path_info.is_used);
disks[root_path_info.path] = disk;

DorisMetrics::instance()->disks_total_capacity.set_metric(
root_path_info.path, root_path_info.disk_capacity);
DorisMetrics::instance()->disks_avail_capacity.set_metric(
root_path_info.path, root_path_info.available);
DorisMetrics::instance()->disks_data_used_capacity.set_metric(
root_path_info.path, root_path_info.data_used_capacity);
DorisMetrics::instance()->disks_state.set_metric(
root_path_info.path, root_path_info.is_used ? 1L : 0L);
DorisMetrics::instance()->disks_total_capacity.set_metric(root_path_info.path,
root_path_info.disk_capacity);
DorisMetrics::instance()->disks_avail_capacity.set_metric(root_path_info.path,
root_path_info.available);
DorisMetrics::instance()->disks_data_used_capacity.set_metric(root_path_info.path,
root_path_info.data_used_capacity);
DorisMetrics::instance()->disks_state.set_metric(root_path_info.path,
root_path_info.is_used ? 1L : 0L);
}
request.__set_disks(disks);

Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ enum OLAPCompressionType {
};

enum PushType {
PUSH_NORMAL = 1,
PUSH_FOR_DELETE = 2,
PUSH_FOR_LOAD_DELETE = 3,
PUSH_NORMAL = 1, // for broker/hadoop load
PUSH_FOR_DELETE = 2, // for delete
PUSH_FOR_LOAD_DELETE = 3, // not use
PUSH_NORMAL_V2 = 4, // for spark load
};

enum ReaderType {
Expand Down
283 changes: 280 additions & 3 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@

#include <boost/filesystem.hpp>

#include "common/status.h"
#include "exec/parquet_scanner.h"
#include "olap/row.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_id_generator.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "runtime/exec_env.h"

using std::list;
using std::map;
Expand Down Expand Up @@ -184,9 +188,16 @@ OLAPStatus PushHandler::_do_streaming_ingestion(
}

// write
res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
&(tablet_vars->at(0).rowset_to_add),
&(tablet_vars->at(1).rowset_to_add));
if (push_type == PUSH_NORMAL_V2) {
res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
&(tablet_vars->at(0).rowset_to_add),
&(tablet_vars->at(1).rowset_to_add));

} else {
res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
&(tablet_vars->at(0).rowset_to_add),
&(tablet_vars->at(1).rowset_to_add));
}
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << res
<< ", failed to process realtime push."
Expand Down Expand Up @@ -248,6 +259,146 @@ void PushHandler::_get_tablet_infos(const vector<TabletVars>& tablet_vars,
}
}

OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet,
TabletSharedPtr new_tablet,
RowsetSharedPtr* cur_rowset,
RowsetSharedPtr* new_rowset) {
OLAPStatus res = OLAP_SUCCESS;
uint32_t num_rows = 0;
PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);

do {
VLOG(3) << "start to convert delta file.";

// 1. init RowsetBuilder of cur_tablet for current push
VLOG(3) << "init rowset builder. tablet=" << cur_tablet->full_name()
<< ", block_row_size=" << cur_tablet->num_rows_per_row_block();
RowsetWriterContext context;
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.tablet_uid = cur_tablet->tablet_uid();
context.tablet_id = cur_tablet->tablet_id();
context.partition_id = _request.partition_id;
context.tablet_schema_hash = cur_tablet->schema_hash();
context.rowset_type = StorageEngine::instance()->default_rowset_type();
context.rowset_path_prefix = cur_tablet->tablet_path();
context.tablet_schema = &(cur_tablet->tablet_schema());
context.rowset_state = PREPARED;
context.txn_id = _request.transaction_id;
context.load_id = load_id;
// although the spark load output files are fully sorted,
// but it depends on thirparty implementation, so we conservatively
// set this value to OVERLAP_UNKNOWN
context.segments_overlap = OVERLAP_UNKNOWN;

std::unique_ptr<RowsetWriter> rowset_writer;
res = RowsetFactory::create_rowset_writer(context, &rowset_writer);
if (OLAP_SUCCESS != res) {
LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name()
<< ", txn_id=" << _request.transaction_id
<< ", res=" << res;
break;
}

// 2. Init PushBrokerReader to read broker file if exist,
// in case of empty push this will be skipped.
std::string path = _request.broker_scan_range.ranges[0].path;
LOG(INFO) << "tablet=" << cur_tablet->full_name() << ", file path=" << path
<< ", file size=" << _request.broker_scan_range.ranges[0].file_size;

if (!path.empty()) {
std::unique_ptr<PushBrokerReader> reader(new(std::nothrow) PushBrokerReader());
if (reader == nullptr) {
LOG(WARNING) << "fail to create reader. tablet=" << cur_tablet->full_name();
res = OLAP_ERR_MALLOC_ERROR;
break;
}

// init schema
std::unique_ptr<Schema> schema(new(std::nothrow) Schema(cur_tablet->tablet_schema()));
if (schema == nullptr) {
LOG(WARNING) << "fail to create schema. tablet=" << cur_tablet->full_name();
res = OLAP_ERR_MALLOC_ERROR;
break;
}

// init Reader
if (OLAP_SUCCESS != (res = reader->init(schema.get(),
_request.broker_scan_range,
_request.desc_tbl))) {
LOG(WARNING) << "fail to init reader. res=" << res
<< ", tablet=" << cur_tablet->full_name();
res = OLAP_ERR_PUSH_INIT_ERROR;
break;
}

// 3. Init Row
uint8_t* tuple_buf = reader->mem_pool()->allocate(schema->schema_size());
ContiguousRow row(schema.get(), tuple_buf);

// 4. Read data from broker and write into SegmentGroup of cur_tablet
// Convert from raw to delta
VLOG(3) << "start to convert etl file to delta.";
while (!reader->eof()) {
res = reader->next(&row);
if (OLAP_SUCCESS != res) {
LOG(WARNING) << "read next row failed."
<< " res=" << res << " read_rows=" << num_rows;
break;
} else {
if (reader->eof()) {
break;
}
if (OLAP_SUCCESS != (res = rowset_writer->add_row(row))) {
LOG(WARNING) << "fail to attach row to rowset_writer. "
<< "res=" << res
<< ", tablet=" << cur_tablet->full_name()
<< ", read_rows=" << num_rows;
break;
}
num_rows++;
}
}

reader->print_profile();
reader->close();
}

if (rowset_writer->flush() != OLAP_SUCCESS) {
LOG(WARNING) << "failed to finalize writer";
break;
}
*cur_rowset = rowset_writer->build();
if (*cur_rowset == nullptr) {
LOG(WARNING) << "fail to build rowset";
res = OLAP_ERR_MALLOC_ERROR;
break;
}

_write_bytes += (*cur_rowset)->data_disk_size();
_write_rows += (*cur_rowset)->num_rows();

// 5. Convert data for schema change tables
VLOG(10) << "load to related tables of schema_change if possible.";
if (new_tablet != nullptr) {
SchemaChangeHandler schema_change;
res = schema_change.schema_version_convert(cur_tablet, new_tablet,
cur_rowset, new_rowset);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to change schema version for delta."
<< "[res=" << res << " new_tablet='"
<< new_tablet->full_name() << "']";
}
}
} while (0);

VLOG(10) << "convert delta file end. res=" << res
<< ", tablet=" << cur_tablet->full_name()
<< ", processed_rows" << num_rows;
return res;
}

OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet,
TabletSharedPtr new_tablet,
RowsetSharedPtr* cur_rowset,
Expand Down Expand Up @@ -761,6 +912,132 @@ OLAPStatus LzoBinaryReader::_next_block() {
return res;
}

OLAPStatus PushBrokerReader::init(const Schema* schema,
const TBrokerScanRange& t_scan_range,
const TDescriptorTable& t_desc_tbl) {
// init schema
_schema = schema;

// init runtime state, runtime profile, counter
TUniqueId dummy_id;
dummy_id.hi = 0;
dummy_id.lo = 0;
TPlanFragmentExecParams params;
params.fragment_instance_id = dummy_id;
params.query_id = dummy_id;
TExecPlanFragmentParams fragment_params;
fragment_params.params = params;
fragment_params.protocol_version = PaloInternalServiceVersion::V1;
TQueryOptions query_options;
TQueryGlobals query_globals;
_runtime_state.reset(new RuntimeState(fragment_params, query_options, query_globals,
ExecEnv::GetInstance()));
DescriptorTbl* desc_tbl = NULL;
Status status = DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &desc_tbl);
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Failed to create descriptor table, msg: " << status.get_error_msg();
return OLAP_ERR_PUSH_INIT_ERROR;
}
_runtime_state->set_desc_tbl(desc_tbl);
status = _runtime_state->init_mem_trackers(dummy_id);
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg();
return OLAP_ERR_PUSH_INIT_ERROR;
}
_runtime_profile.reset(_runtime_state->runtime_profile());
_runtime_profile->set_name("PushBrokerReader");
_mem_tracker.reset(new MemTracker(_runtime_profile.get(), -1, _runtime_profile->name(), _runtime_state->instance_mem_tracker()));
_mem_pool.reset(new MemPool(_mem_tracker.get()));
_counter.reset(new ScannerCounter());

// init scanner
BaseScanner *scanner = nullptr;
switch (t_scan_range.ranges[0].format_type) {
case TFileFormatType::FORMAT_PARQUET:
scanner = new ParquetScanner(_runtime_state.get(),
_runtime_profile.get(),
t_scan_range.params,
t_scan_range.ranges,
t_scan_range.broker_addresses,
_counter.get());
break;
default:
LOG(WARNING) << "Unsupported file format type: " << t_scan_range.ranges[0].format_type;
return OLAP_ERR_PUSH_INIT_ERROR;
}
_scanner.reset(scanner);
status = _scanner->open();
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Failed to open scanner, msg: " << status.get_error_msg();
return OLAP_ERR_PUSH_INIT_ERROR;
}

// init tuple
auto tuple_id = t_scan_range.params.dest_tuple_id;
_tuple_desc = _runtime_state->desc_tbl().get_tuple_descriptor(tuple_id);
if (_tuple_desc == nullptr) {
std::stringstream ss;
LOG(WARNING) << "Failed to get tuple descriptor, tuple_id: " << tuple_id;
return OLAP_ERR_PUSH_INIT_ERROR;
}

int tuple_buffer_size = _tuple_desc->byte_size();
void* tuple_buffer = _mem_pool->allocate(tuple_buffer_size);
if (tuple_buffer == nullptr) {
LOG(WARNING) << "Allocate memory for tuple failed";
return OLAP_ERR_PUSH_INIT_ERROR;
}
_tuple = reinterpret_cast<Tuple*>(tuple_buffer);

_ready = true;
return OLAP_SUCCESS;
}

OLAPStatus PushBrokerReader::next(ContiguousRow* row) {
if (!_ready || row == nullptr) {
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}

memset(_tuple, 0, _tuple_desc->num_null_bytes());
// Get from scanner
Status status = _scanner->get_next(_tuple, _mem_pool.get(), &_eof);
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Scanner get next tuple failed";
return OLAP_ERR_PUSH_INPUT_DATA_ERROR;
}
if (_eof) {
return OLAP_SUCCESS;
}

auto slot_descs = _tuple_desc->slots();
size_t num_key_columns = _schema->num_key_columns();

// finalize row
for (size_t i = 0; i < slot_descs.size(); ++i) {
auto cell = row->cell(i);
const SlotDescriptor* slot = slot_descs[i];
bool is_null = _tuple->is_null(slot->null_indicator_offset());
const void* value = _tuple->get_slot(slot->tuple_offset());
// try execute init method defined in aggregateInfo
// by default it only copies data into cell
_schema->column(i)->consume(&cell, (const char*)value, is_null,
_mem_pool.get(), _runtime_state->obj_pool());
// if column(i) is a value column, try execute finalize method defined in aggregateInfo
// to convert data into final format
if (i >= num_key_columns) {
_schema->column(i)->agg_finalize(&cell, _mem_pool.get());
}
}

return OLAP_SUCCESS;
}

void PushBrokerReader::print_profile() {
std::stringstream ss;
_runtime_profile->pretty_print(&ss);
LOG(INFO) << ss.str();
}

string PushHandler::_debug_version_list(const Versions& versions) const {
std::ostringstream txt;
txt << "Versions: ";
Expand Down
Loading