Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions be/src/exec/es_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ Status EsScanNode::open(RuntimeState* state) {
for (int i = predicate_to_conjunct.size() - 1; i >= 0; i--) {
int conjunct_index = predicate_to_conjunct[i];
if (conjunct_accepted_times[conjunct_index] == _scan_ranges.size()) {
_pushdown_conjunct_ctxs.push_back(*(_conjunct_ctxs.begin() + conjunct_index));
_conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index);
}
}
Expand Down Expand Up @@ -259,7 +260,8 @@ Status EsScanNode::close(RuntimeState* state) {
VLOG(1) << "EsScanNode::Close";
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
SCOPED_TIMER(_runtime_profile->total_time_counter());

Expr::close(_pushdown_conjunct_ctxs, state);
RETURN_IF_ERROR(ExecNode::close(state));
for (int i = 0; i < _addresses.size(); ++i) {
TExtCloseParams params;
params.__set_scan_handle(_scan_handles[i]);
Expand Down Expand Up @@ -307,7 +309,6 @@ Status EsScanNode::close(RuntimeState* state) {
#endif
}

RETURN_IF_ERROR(ExecNode::close(state));
return Status::OK;
}

Expand Down Expand Up @@ -563,21 +564,53 @@ bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* l
case TExprNodeType::FLOAT_LITERAL: {
TFloatLiteral float_literal;
void* value = context->get_value(expr, NULL);
float_literal.__set_value(*reinterpret_cast<float*>(value));
switch (expr->type().type) {
case TYPE_FLOAT: {
float_literal.__set_value(*reinterpret_cast<float*>(value));
break;
}
case TYPE_DOUBLE: {
float_literal.__set_value(*((double *)value));
break;
}
default:
return false;
}
literal->__set_float_literal(float_literal);
return true;
}
case TExprNodeType::INT_LITERAL: {
TIntLiteral int_literal;
void* value = context->get_value(expr, NULL);
int_literal.__set_value(*reinterpret_cast<int32_t*>(value));
int64_t int_val = 0;
switch (expr->type().type) {
case TYPE_TINYINT: {
int_val = *reinterpret_cast<int8_t*>(value);
break;
}
case TYPE_SMALLINT: {
int_val = *reinterpret_cast<int16_t*>(value);
break;
}
case TYPE_INT: {
int_val = *reinterpret_cast<int32_t*>(value);
break;
}
case TYPE_BIGINT: {
int_val = *reinterpret_cast<int64_t*>(value);
break;
}
default:
return false;
}
int_literal.__set_value(int_val);
literal->__set_int_literal(int_literal);
return true;
}
case TExprNodeType::STRING_LITERAL: {
TStringLiteral string_literal;
void* value = context->get_value(expr, NULL);
string_literal.__set_value(*reinterpret_cast<string*>(value));
string_literal.__set_value((reinterpret_cast<StringValue*>(value))->debug_string());
literal->__set_string_literal(string_literal);
return true;
}
Expand Down Expand Up @@ -783,6 +816,7 @@ Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple,
!reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.long_vals[val_idx])) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATETIME"));
}
reinterpret_cast<DateTimeValue*>(slot)->set_type(TIME_DATETIME);
break;
}
case TYPE_DECIMAL: {
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/es_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class EsScanNode : public ScanNode {
std::vector<TNetworkAddress> _addresses;
std::vector<std::string> _scan_handles;
std::vector<int> _offsets;
std::vector<ExprContext*> _pushdown_conjunct_ctxs;
};

}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/datetime_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1541,7 +1541,7 @@ int DateTimeValue::unix_timestamp() const {
return seconds;
}

bool DateTimeValue::from_unixtime(int seconds) {
bool DateTimeValue::from_unixtime(int64_t seconds) {
if (seconds < 0) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/datetime_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ class DateTimeValue {

int unix_timestamp() const;

bool from_unixtime(int);
bool from_unixtime(int64_t);

bool operator==(const DateTimeValue& other) const {
// NOTE: This is not same with MySQL.
Expand Down
16 changes: 16 additions & 0 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,19 @@ std::string BrokerTableDescriptor::debug_string() const {
return out.str();
}

EsTableDescriptor::EsTableDescriptor(const TTableDescriptor& tdesc)
: TableDescriptor(tdesc) {
}

EsTableDescriptor::~EsTableDescriptor() {
}

std::string EsTableDescriptor::debug_string() const {
std::stringstream out;
out << "EsTable(" << TableDescriptor::debug_string() << ")";
return out.str();
}

KuduTableDescriptor::KuduTableDescriptor(const TTableDescriptor& tdesc)
: TableDescriptor(tdesc),
table_name_(tdesc.kuduTable.table_name),
Expand Down Expand Up @@ -483,6 +496,9 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb
case TTableType::BROKER_TABLE:
desc = pool->add(new BrokerTableDescriptor(tdesc));
break;
case TTableType::ES_TABLE:
desc = pool->add(new EsTableDescriptor(tdesc));
break;
default:
DCHECK(false) << "invalid table type: " << tdesc.tableType;
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,14 @@ public :
private :
};

class EsTableDescriptor : public TableDescriptor {
public :
EsTableDescriptor(const TTableDescriptor& tdesc);
virtual ~EsTableDescriptor();
virtual std::string debug_string() const;
private :
};

// Descriptor for a KuduTable
class KuduTableDescriptor : public TableDescriptor {
public:
Expand Down
35 changes: 18 additions & 17 deletions fe/src/main/java/org/apache/doris/catalog/EsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public class EsTable extends Table {

private String hosts;
private String[] seeds;
private String userName;
private String passwd;
private String userName = "";
private String passwd = "";
private String indexName;
private String mappingType = "doc";
private String mappingType = "_doc";
// only save the partition definition, save the partition key,
// partition list is got from es cluster dynamically and is saved in esTableState
private PartitionInfo partitionInfo;
Expand All @@ -77,33 +77,34 @@ private void validate(Map<String, String> properties) throws DdlException {
+ "they are: hosts, user, password, index");
}

hosts = properties.get(HOSTS);
if (Strings.isNullOrEmpty(hosts)) {
if (Strings.isNullOrEmpty(properties.get(HOSTS))
|| Strings.isNullOrEmpty(properties.get(HOSTS).trim())) {
throw new DdlException("Hosts of ES table is null. "
+ "Please add properties('hosts'='xxx.xxx.xxx.xxx,xxx.xxx.xxx.xxx') when create table");
}
hosts = properties.get(HOSTS).trim();
seeds = hosts.split(",");
// TODO(ygl) validate the seeds?

userName = properties.get(USER);
if (Strings.isNullOrEmpty(userName)) {
userName = "";
if (!Strings.isNullOrEmpty(properties.get(USER))
&& !Strings.isNullOrEmpty(properties.get(USER).trim())) {
userName = properties.get(USER).trim();
}

passwd = properties.get(PASSWORD);
if (passwd == null) {
passwd = "";
if (!Strings.isNullOrEmpty(properties.get(PASSWORD))
&& !Strings.isNullOrEmpty(properties.get(PASSWORD).trim())) {
passwd = properties.get(PASSWORD).trim();
}

indexName = properties.get(INDEX);
if (Strings.isNullOrEmpty(indexName)) {
if (Strings.isNullOrEmpty(properties.get(INDEX))
|| Strings.isNullOrEmpty(properties.get(INDEX).trim())) {
throw new DdlException("Index of ES table is null. "
+ "Please add properties('index'='xxxx') when create table");
}
indexName = properties.get(INDEX).trim();

mappingType = properties.get(TYPE);
if (Strings.isNullOrEmpty(mappingType)) {
mappingType = "docs";
if (!Strings.isNullOrEmpty(properties.get(TYPE))
&& !Strings.isNullOrEmpty(properties.get(TYPE).trim())) {
mappingType = properties.get(TYPE).trim();
}
}

Expand Down
3 changes: 3 additions & 0 deletions fe/src/main/java/org/apache/doris/planner/EsScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ private void assignBackends() throws UserException {
// TODO (ygl) should not get all shards, prune unrelated shard
private List<TScanRangeLocations> getShardLocations() throws UserException {
// has to get partition info from es state not from table because the partition info is generated from es cluster state dynamically
if (esTableState == null) {
throw new UserException("EsTable shard info has not been synced, wait some time or check log");
}
Collection<Long> partitionIds = partitionPrune(esTableState.getPartitionInfo());
List<EsIndexState> selectedIndex = Lists.newArrayList();
ArrayList<String> unPartitionedIndices = Lists.newArrayList();
Expand Down