Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {
}
}

int64_t left_time = ctx->kafka_info->max_interval_s;
int64_t left_rows = ctx->kafka_info->max_batch_rows;
int64_t left_bytes = ctx->kafka_info->max_batch_size;
int64_t left_time = ctx->max_interval_s;
int64_t left_rows = ctx->max_batch_rows;
int64_t left_bytes = ctx->max_batch_size;

std::shared_ptr<KafkaConsumerPipe> kakfa_pipe = std::static_pointer_cast<KafkaConsumerPipe>(ctx->body_sink);

Expand All @@ -145,7 +145,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {
<< ", batch size: " << left_bytes
<< ". " << ctx->brief();

// copy one
// copy one
std::map<int32_t, int64_t> cmt_offset = ctx->kafka_info->cmt_offset;
MonotonicStopWatch watch;
watch.start();
Expand All @@ -169,15 +169,15 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {
<< ", left rows=" << left_rows
<< ", left bytes=" << left_bytes;

if (left_bytes == ctx->kafka_info->max_batch_size) {
if (left_bytes == ctx->max_batch_size) {
// nothing to be consumed, cancel it
// we do not allow finishing stream load pipe without data
kakfa_pipe->cancel();
_cancelled = true;
return Status::CANCELLED;
} else {
DCHECK(left_bytes < ctx->kafka_info->max_batch_size);
DCHECK(left_rows < ctx->kafka_info->max_batch_rows);
DCHECK(left_bytes < ctx->max_batch_size);
DCHECK(left_rows < ctx->max_batch_rows);
kakfa_pipe->finish();
ctx->kafka_info->cmt_offset = std::move(cmt_offset);
_finished = true;
Expand Down Expand Up @@ -223,7 +223,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {
return st;
}

left_time = ctx->kafka_info->max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000;
left_time = ctx->max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000;
}

return Status::OK;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
ctx->label = task.label;
ctx->auth.auth_code = task.auth_code;

if (task.__isset.max_interval_s) { ctx->max_interval_s = task.max_interval_s; }
if (task.__isset.max_batch_rows) { ctx->max_batch_rows = task.max_batch_rows; }
if (task.__isset.max_batch_size) { ctx->max_batch_size = task.max_batch_size; }

// set execute plan params
TStreamLoadPutResult put_result;
TStatus tstatus;
Expand Down
9 changes: 6 additions & 3 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ class KafkaLoadInfo {
for (auto& p : t_info.partition_begin_offset) {
cmt_offset[p.first] = p.second -1;
}
if (t_info.__isset.max_interval_s) { max_interval_s = t_info.max_interval_s; }
if (t_info.__isset.max_batch_rows) { max_batch_rows = t_info.max_batch_rows; }
if (t_info.__isset.max_batch_size) { max_batch_size = t_info.max_batch_size; }
}

public:
Expand Down Expand Up @@ -121,6 +118,12 @@ class StreamLoadContext {

AuthInfo auth;

// the following members control the max progress of a consuming
// process. if any of them reach, the consuming will finish.
int64_t max_interval_s = 5;
int64_t max_batch_rows = 100000;
int64_t max_batch_size = 100 * 1024 * 1024; // 100MB

// only used to check if we receive whole body
size_t body_bytes = 0;
size_t receive_bytes = 0;
Expand Down
13 changes: 2 additions & 11 deletions fe/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ nonterminal TablePattern tbl_pattern;
nonterminal String ident_or_star;

// Routine load
nonterminal LoadColumnsInfo load_columns_info;
nonterminal ParseNode load_property;
nonterminal List<ParseNode> opt_load_property_list;

Expand Down Expand Up @@ -449,7 +448,7 @@ query ::=
;

import_columns_stmt ::=
KW_COLUMNS import_column_descs:columns
KW_COLUMNS LPAREN import_column_descs:columns RPAREN
{:
RESULT = new ImportColumnsStmt(columns);
:}
Expand Down Expand Up @@ -1177,7 +1176,7 @@ load_property ::=
{:
RESULT = colSep;
:}
| load_columns_info:columnsInfo
| import_columns_stmt:columnsInfo
{:
RESULT = columnsInfo;
:}
Expand All @@ -1191,14 +1190,6 @@ load_property ::=
:}
;

load_columns_info ::=
col_list:colList
opt_col_mapping_list:colMappingList
{:
RESULT = new LoadColumnsInfo(colList, colMappingList);
:}
;

pause_routine_load_stmt ::=
KW_PAUSE KW_ROUTINE KW_LOAD ident:name
{:
Expand Down
Loading