Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ Status BrokerScanNode::scanner_scan(
tuple = reinterpret_cast<Tuple*>(new_tuple);
counter->num_rows_returned++;
} else {
counter->num_rows_filtered++;
counter->num_rows_unselected++;
}
}

Expand Down Expand Up @@ -409,6 +409,7 @@ void BrokerScanNode::scanner_worker(int start_idx, int length) {
// Update stats
_runtime_state->update_num_rows_load_success(counter.num_rows_returned);
_runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered);
_runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected);

// scanner is going to finish
{
Expand Down
10 changes: 7 additions & 3 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ class RuntimeProfile;
class StreamLoadPipe;

struct BrokerScanCounter {
BrokerScanCounter() : num_rows_returned(0), num_rows_filtered(0) {
BrokerScanCounter() :
num_rows_returned(0),
num_rows_filtered(0),
num_rows_unselected(0) {
}

int64_t num_rows_returned;
int64_t num_rows_filtered;
int64_t num_rows_returned; // qualified rows
int64_t num_rows_filtered; // unqualified rows
int64_t num_rows_unselected; // rows filterd by predicates
};

// Broker scanner convert the data read from broker to doris's tuple.
Expand Down
15 changes: 8 additions & 7 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {

_last_visit_time = time(nullptr);

int64_t left_time = ctx->max_interval_s;
int64_t left_time = ctx->max_interval_s * 1000;
int64_t left_rows = ctx->max_batch_rows;
int64_t left_bytes = ctx->max_batch_size;

std::shared_ptr<KafkaConsumerPipe> kakfa_pipe = std::static_pointer_cast<KafkaConsumerPipe>(ctx->body_sink);

LOG(INFO) << "start consumer"
<< ". max time(s): " << left_time
<< ". max time(ms): " << left_time
<< ", batch rows: " << left_rows
<< ", batch size: " << left_bytes
<< ". " << ctx->brief();
Expand All @@ -168,10 +168,11 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {

if (left_time <= 0 || left_rows <= 0 || left_bytes <=0) {
LOG(INFO) << "kafka consume batch done"
<< ". left time=" << left_time
<< ", left rows=" << left_rows
<< ", left bytes=" << left_bytes
<< ", consumer time(ms)=" << consumer_watch.elapsed_time() / 1000 / 1000;
<< ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time
<< ", received rows=" << ctx->max_batch_rows - left_rows
<< ", received bytes=" << ctx->max_batch_size - left_bytes
<< ", kafka consume time(ms)=" << consumer_watch.elapsed_time() / 1000 / 1000;


if (left_bytes == ctx->max_batch_size) {
// nothing to be consumed, cancel it
Expand Down Expand Up @@ -230,7 +231,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {
return st;
}

left_time = ctx->max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000;
left_time = ctx->max_interval_s * 1000 - watch.elapsed_time() / 1000 / 1000;
}

return Status::OK;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ RuntimeState::RuntimeState(
_root_node_id(-1),
_num_rows_load_success(0),
_num_rows_load_filtered(0),
_num_rows_load_unselected(0),
_num_print_error_rows(0),
_normal_row_number(0),
_error_row_number(0),
Expand All @@ -86,6 +87,7 @@ RuntimeState::RuntimeState(
_root_node_id(-1),
_num_rows_load_success(0),
_num_rows_load_filtered(0),
_num_rows_load_unselected(0),
_num_print_error_rows(0),
_normal_row_number(0),
_error_row_number(0),
Expand Down
10 changes: 10 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,22 @@ class RuntimeState {
return _num_rows_load_filtered.load();
}

int64_t num_rows_load_unselected() {
return _num_rows_load_unselected.load();
}

void update_num_rows_load_success(int64_t num_rows) {
_num_rows_load_success.fetch_add(num_rows);
}

void update_num_rows_load_filtered(int64_t num_rows) {
_num_rows_load_filtered.fetch_add(num_rows);
}

void update_num_rows_load_unselected(int64_t num_rows) {
_num_rows_load_unselected.fetch_add(num_rows);
}

void export_load_error(const std::string& error_msg);

void set_per_fragment_instance_idx(int idx) {
Expand Down Expand Up @@ -585,6 +594,7 @@ class RuntimeState {
std::vector<std::string> _output_files;
std::atomic<int64_t> _num_rows_load_success;
std::atomic<int64_t> _num_rows_load_filtered;
std::atomic<int64_t> _num_rows_load_unselected;
std::atomic<int64_t> _num_print_error_rows;

std::vector<std::string> _export_output_files;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ std::string StreamLoadContext::to_json() const {
writer.Int64(number_loaded_rows);
writer.Key("NumberFilteredRows");
writer.Int64(number_filtered_rows);
writer.Key("NumberUnselectedRows");
writer.Int64(number_unselected_rows);
writer.Key("LoadBytes");
writer.Int64(receive_bytes);
writer.Key("LoadTimeMs");
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class StreamLoadContext {

int64_t number_loaded_rows = 0;
int64_t number_filtered_rows = 0;
int64_t number_unselected_rows = 0;
int64_t loaded_bytes = 0;
int64_t start_nanos = 0;
int64_t load_cost_nanos = 0;
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
if (status.ok()) {
ctx->number_loaded_rows = executor->runtime_state()->num_rows_load_success();
ctx->number_filtered_rows = executor->runtime_state()->num_rows_load_filtered();
ctx->number_unselected_rows = executor->runtime_state()->num_rows_load_unselected();

int64_t num_total_rows =
ctx->number_loaded_rows + ctx->number_filtered_rows;
if ((0.0 + ctx->number_filtered_rows) / num_total_rows > ctx->max_filter_ratio) {
Expand Down Expand Up @@ -218,6 +220,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt
rl_attach.id = ctx->id.to_thrift();
rl_attach.__set_loadedRows(ctx->number_loaded_rows);
rl_attach.__set_filteredRows(ctx->number_filtered_rows);
rl_attach.__set_unselectedRows(ctx->number_unselected_rows);
rl_attach.__set_receivedBytes(ctx->receive_bytes);
rl_attach.__set_loadedBytes(ctx->loaded_bytes);
rl_attach.__set_loadCostMs(ctx->load_cost_nanos / 1000 / 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.routineload.LoadDataSourceType;
import org.apache.doris.load.routineload.RoutineLoadJob;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
Expand All @@ -32,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.regex.Pattern;

/*
Expand Down Expand Up @@ -79,9 +82,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
// max error number in ten thousand records
public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number";
// the following 3 properties limit the time and batch size of a single routine load task
public static final String MAX_BATCH_INTERVAL_SECOND = "max_batch_interval";
public static final String MAX_BATCH_ROWS = "max_batch_rows";
public static final String MAX_BATCH_SIZE = "max_batch_size";
public static final String MAX_BATCH_INTERVAL_SEC_PROPERTY = "max_batch_interval";
public static final String MAX_BATCH_ROWS_PROPERTY = "max_batch_rows";
public static final String MAX_BATCH_SIZE_PROPERTY = "max_batch_size";

// kafka type properties
public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list";
Expand All @@ -96,9 +99,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(DESIRED_CONCURRENT_NUMBER_PROPERTY)
.add(MAX_ERROR_NUMBER_PROPERTY)
.add(MAX_BATCH_INTERVAL_SECOND)
.add(MAX_BATCH_ROWS)
.add(MAX_BATCH_SIZE)
.add(MAX_BATCH_INTERVAL_SEC_PROPERTY)
.add(MAX_BATCH_ROWS_PROPERTY)
.add(MAX_BATCH_SIZE_PROPERTY)
.build();

private static final ImmutableSet<String> KAFKA_PROPERTIES_SET = new ImmutableSet.Builder<String>()
Expand All @@ -121,17 +124,23 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private String dbName;
private RoutineLoadDesc routineLoadDesc;
private int desiredConcurrentNum = 1;
private int maxErrorNum = -1;
private int maxBatchIntervalS = -1;
private int maxBatchRows = -1;
private int maxBatchSizeBytes = -1;
private long maxErrorNum = -1;
private long maxBatchIntervalS = -1;
private long maxBatchRows = -1;
private long maxBatchSizeBytes = -1;

// kafka related properties
private String kafkaBrokerList;
private String kafkaTopic;
// pair<partition id, offset>
private List<Pair<Integer, Long>> kafkaPartitionOffsets = Lists.newArrayList();

private static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) -> { return v > 0L; };
private static final Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> { return v >= 0L; };
private static final Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> { return v >= 5 && v <= 60; };
private static final Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> { return v > 200000; };
private static final Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> { return v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; };

public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNode> loadPropertyList,
Map<String, String> jobProperties,
String typeName, Map<String, String> dataSourceProperties) {
Expand Down Expand Up @@ -167,19 +176,19 @@ public int getDesiredConcurrentNum() {
return desiredConcurrentNum;
}

public int getMaxErrorNum() {
public long getMaxErrorNum() {
return maxErrorNum;
}

public int getMaxBatchIntervalS() {
public long getMaxBatchIntervalS() {
return maxBatchIntervalS;
}

public int getMaxBatchRows() {
public long getMaxBatchRows() {
return maxBatchRows;
}

public int getMaxBatchSize() {
public long getMaxBatchSize() {
return maxBatchSizeBytes;
}

Expand Down Expand Up @@ -268,26 +277,25 @@ private void checkJobProperties() throws AnalysisException {
throw new AnalysisException(optional.get() + " is invalid property");
}

desiredConcurrentNum = getIntegetPropertyOrDefault(DESIRED_CONCURRENT_NUMBER_PROPERTY,
"must be greater then 0", desiredConcurrentNum);
maxErrorNum = getIntegetPropertyOrDefault(MAX_ERROR_NUMBER_PROPERTY,
"must be greater then or equal to 0", maxErrorNum);
maxBatchIntervalS = getIntegetPropertyOrDefault(MAX_BATCH_INTERVAL_SECOND,
"must be greater then 0", maxBatchIntervalS);
maxBatchRows = getIntegetPropertyOrDefault(MAX_BATCH_ROWS, "must be greater then 0", maxBatchRows);
maxBatchSizeBytes = getIntegetPropertyOrDefault(MAX_BATCH_SIZE, "must be greater then 0", maxBatchSizeBytes);
}

private int getIntegetPropertyOrDefault(String propName, String hintMsg, int defaultVal) throws AnalysisException {
final String propVal = jobProperties.get(propName);
if (propVal != null) {
int intVal = getIntegerValueFromString(propVal, propName);
if (intVal <= 0) {
throw new AnalysisException(propName + " " + hintMsg);
}
return intVal;
}
return defaultVal;
desiredConcurrentNum = ((Long) Util.getLongPropertyOrDefault(jobProperties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY),
RoutineLoadJob.DEFAULT_TASK_MAX_CONCURRENT_NUM, DESIRED_CONCURRENT_NUMBER_PRED,
DESIRED_CONCURRENT_NUMBER_PROPERTY + " should > 0")).intValue();

maxErrorNum = Util.getLongPropertyOrDefault(jobProperties.get(MAX_ERROR_NUMBER_PROPERTY),
RoutineLoadJob.DEFAULT_MAX_ERROR_NUM, MAX_ERROR_NUMBER_PRED,
MAX_ERROR_NUMBER_PROPERTY + " should >= 0");

maxBatchIntervalS = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_INTERVAL_SEC_PROPERTY),
RoutineLoadJob.DEFAULT_MAX_INTERVAL_SECOND, MAX_BATCH_INTERVAL_PRED,
MAX_BATCH_INTERVAL_SEC_PROPERTY + " should between 5 and 60");

maxBatchRows = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_ROWS_PROPERTY),
RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS, MAX_BATCH_ROWS_PRED,
MAX_BATCH_ROWS_PROPERTY + " should > 200000");

maxBatchSizeBytes = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_SIZE_PROPERTY),
RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE, MAX_BATCH_SIZE_PRED,
MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 1GB");
}

private void checkDataSourceProperties() throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(columnName);
if (expr != null) {
sb.append(" = ").append(expr.toSql());
sb.append("=").append(expr.toSql());
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@

package org.apache.doris.analysis;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ShowResultSetMetaData;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;

/*
Show routine load progress by routine load name

Expand Down Expand Up @@ -63,15 +64,16 @@ public class ShowRoutineLoadStmt extends ShowStmt {
new ImmutableList.Builder<String>()
.add("Id")
.add("Name")
.add("CreateTime")
.add("EndTime")
.add("DBId")
.add("TableId")
.add("State")
.add("DataSourceType")
.add("CurrentTaskNum")
.add("JobProperties")
.add("DataSourceProperties")
.add("CurrentTaskConcurrentNumber")
.add("TotalRows")
.add("TotalErrorRows")
.add("Statistic")
.add("Progress")
.add("ReasonOfStateChanged")
.build();
Expand Down
27 changes: 27 additions & 0 deletions fe/src/main/java/org/apache/doris/common/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;

import org.apache.logging.log4j.LogManager;
Expand All @@ -40,6 +42,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.Predicate;
import java.util.zip.Adler32;

public class Util {
Expand Down Expand Up @@ -362,5 +365,29 @@ public static String getResultForUrl(String urlStr, String encodedAuthInfo, int
LOG.debug("get result from url {}: {}", urlStr, sb.toString());
return sb.toString();
}

public static long getLongPropertyOrDefault(String valStr, long defaultVal, Predicate<Long> pred,
String hintMsg) throws AnalysisException {
if (Strings.isNullOrEmpty(valStr)) {
return defaultVal;
}

long result = defaultVal;
try {
result = Long.valueOf(valStr);
} catch (NumberFormatException e) {
throw new AnalysisException(hintMsg);
}

if (pred == null) {
return result;
}

if (!pred.test(result)) {
throw new AnalysisException(hintMsg);
}

return result;
}
}

Loading