diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp index b4ae0a8e5a499c..f0fb17e2f32bfd 100644 --- a/be/src/exec/es_scan_node.cpp +++ b/be/src/exec/es_scan_node.cpp @@ -186,6 +186,7 @@ Status EsScanNode::open(RuntimeState* state) { for (int i = predicate_to_conjunct.size() - 1; i >= 0; i--) { int conjunct_index = predicate_to_conjunct[i]; if (conjunct_accepted_times[conjunct_index] == _scan_ranges.size()) { + _pushdown_conjunct_ctxs.push_back(*(_conjunct_ctxs.begin() + conjunct_index)); _conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index); } } @@ -259,7 +260,8 @@ Status EsScanNode::close(RuntimeState* state) { VLOG(1) << "EsScanNode::Close"; RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - + Expr::close(_pushdown_conjunct_ctxs, state); + RETURN_IF_ERROR(ExecNode::close(state)); for (int i = 0; i < _addresses.size(); ++i) { TExtCloseParams params; params.__set_scan_handle(_scan_handles[i]); @@ -307,7 +309,6 @@ Status EsScanNode::close(RuntimeState* state) { #endif } - RETURN_IF_ERROR(ExecNode::close(state)); return Status::OK; } @@ -563,21 +564,53 @@ bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* l case TExprNodeType::FLOAT_LITERAL: { TFloatLiteral float_literal; void* value = context->get_value(expr, NULL); - float_literal.__set_value(*reinterpret_cast(value)); + switch (expr->type().type) { + case TYPE_FLOAT: { + float_literal.__set_value(*reinterpret_cast(value)); + break; + } + case TYPE_DOUBLE: { + float_literal.__set_value(*((double *)value)); + break; + } + default: + return false; + } literal->__set_float_literal(float_literal); return true; } case TExprNodeType::INT_LITERAL: { TIntLiteral int_literal; void* value = context->get_value(expr, NULL); - int_literal.__set_value(*reinterpret_cast(value)); + int64_t int_val = 0; + switch (expr->type().type) { + case TYPE_TINYINT: { + int_val = *reinterpret_cast(value); + break; + } + case TYPE_SMALLINT: { + int_val = *reinterpret_cast(value); + break; + } + case TYPE_INT: { + int_val = *reinterpret_cast(value); + break; + } + case TYPE_BIGINT: { + int_val = *reinterpret_cast(value); + break; + } + default: + return false; + } + int_literal.__set_value(int_val); literal->__set_int_literal(int_literal); return true; } case TExprNodeType::STRING_LITERAL: { TStringLiteral string_literal; void* value = context->get_value(expr, NULL); - string_literal.__set_value(*reinterpret_cast(value)); + string_literal.__set_value((reinterpret_cast(value))->debug_string()); literal->__set_string_literal(string_literal); return true; } @@ -783,6 +816,7 @@ Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple, !reinterpret_cast(slot)->from_unixtime(col.long_vals[val_idx])) { return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATETIME")); } + reinterpret_cast(slot)->set_type(TIME_DATETIME); break; } case TYPE_DECIMAL: { diff --git a/be/src/exec/es_scan_node.h b/be/src/exec/es_scan_node.h index 64c3abc0edb57c..38071f4f27dce1 100644 --- a/be/src/exec/es_scan_node.h +++ b/be/src/exec/es_scan_node.h @@ -83,6 +83,7 @@ class EsScanNode : public ScanNode { std::vector _addresses; std::vector _scan_handles; std::vector _offsets; + std::vector _pushdown_conjunct_ctxs; }; } diff --git a/be/src/runtime/datetime_value.cpp b/be/src/runtime/datetime_value.cpp index d3e4f0e50be035..b45e9bc578afb0 100644 --- a/be/src/runtime/datetime_value.cpp +++ b/be/src/runtime/datetime_value.cpp @@ -1541,7 +1541,7 @@ int DateTimeValue::unix_timestamp() const { return seconds; } -bool DateTimeValue::from_unixtime(int seconds) { +bool DateTimeValue::from_unixtime(int64_t seconds) { if (seconds < 0) { return false; } diff --git a/be/src/runtime/datetime_value.h b/be/src/runtime/datetime_value.h index 9994b3d26bff98..3ae5c5af4ad50b 100644 --- a/be/src/runtime/datetime_value.h +++ b/be/src/runtime/datetime_value.h @@ -334,7 +334,7 @@ class DateTimeValue { int unix_timestamp() const; - bool from_unixtime(int); + bool from_unixtime(int64_t); bool operator==(const DateTimeValue& other) const { // NOTE: This is not same with MySQL. diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 7304318bfc8b8a..1b0b6c0a7dc25f 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -152,6 +152,19 @@ std::string BrokerTableDescriptor::debug_string() const { return out.str(); } +EsTableDescriptor::EsTableDescriptor(const TTableDescriptor& tdesc) + : TableDescriptor(tdesc) { +} + +EsTableDescriptor::~EsTableDescriptor() { +} + +std::string EsTableDescriptor::debug_string() const { + std::stringstream out; + out << "EsTable(" << TableDescriptor::debug_string() << ")"; + return out.str(); +} + KuduTableDescriptor::KuduTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc), table_name_(tdesc.kuduTable.table_name), @@ -483,6 +496,9 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb case TTableType::BROKER_TABLE: desc = pool->add(new BrokerTableDescriptor(tdesc)); break; + case TTableType::ES_TABLE: + desc = pool->add(new EsTableDescriptor(tdesc)); + break; default: DCHECK(false) << "invalid table type: " << tdesc.tableType; } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 7dd41b6a133924..15219e09ec2944 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -241,6 +241,14 @@ public : private : }; +class EsTableDescriptor : public TableDescriptor { +public : + EsTableDescriptor(const TTableDescriptor& tdesc); + virtual ~EsTableDescriptor(); + virtual std::string debug_string() const; +private : +}; + // Descriptor for a KuduTable class KuduTableDescriptor : public TableDescriptor { public: diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java index a0d930916411a9..fdcb7c843adfc5 100644 --- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java @@ -50,10 +50,10 @@ public class EsTable extends Table { private String hosts; private String[] seeds; - private String userName; - private String passwd; + private String userName = ""; + private String passwd = ""; private String indexName; - private String mappingType = "doc"; + private String mappingType = "_doc"; // only save the partition definition, save the partition key, // partition list is got from es cluster dynamically and is saved in esTableState private PartitionInfo partitionInfo; @@ -77,33 +77,34 @@ private void validate(Map properties) throws DdlException { + "they are: hosts, user, password, index"); } - hosts = properties.get(HOSTS); - if (Strings.isNullOrEmpty(hosts)) { + if (Strings.isNullOrEmpty(properties.get(HOSTS)) + || Strings.isNullOrEmpty(properties.get(HOSTS).trim())) { throw new DdlException("Hosts of ES table is null. " + "Please add properties('hosts'='xxx.xxx.xxx.xxx,xxx.xxx.xxx.xxx') when create table"); } + hosts = properties.get(HOSTS).trim(); seeds = hosts.split(","); - // TODO(ygl) validate the seeds? - userName = properties.get(USER); - if (Strings.isNullOrEmpty(userName)) { - userName = ""; + if (!Strings.isNullOrEmpty(properties.get(USER)) + && !Strings.isNullOrEmpty(properties.get(USER).trim())) { + userName = properties.get(USER).trim(); } - passwd = properties.get(PASSWORD); - if (passwd == null) { - passwd = ""; + if (!Strings.isNullOrEmpty(properties.get(PASSWORD)) + && !Strings.isNullOrEmpty(properties.get(PASSWORD).trim())) { + passwd = properties.get(PASSWORD).trim(); } - indexName = properties.get(INDEX); - if (Strings.isNullOrEmpty(indexName)) { + if (Strings.isNullOrEmpty(properties.get(INDEX)) + || Strings.isNullOrEmpty(properties.get(INDEX).trim())) { throw new DdlException("Index of ES table is null. " + "Please add properties('index'='xxxx') when create table"); } + indexName = properties.get(INDEX).trim(); - mappingType = properties.get(TYPE); - if (Strings.isNullOrEmpty(mappingType)) { - mappingType = "docs"; + if (!Strings.isNullOrEmpty(properties.get(TYPE)) + && !Strings.isNullOrEmpty(properties.get(TYPE).trim())) { + mappingType = properties.get(TYPE).trim(); } } diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java index 6eb6eae2ceae52..790b976ebb0d44 100644 --- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -156,6 +156,9 @@ private void assignBackends() throws UserException { // TODO (ygl) should not get all shards, prune unrelated shard private List getShardLocations() throws UserException { // has to get partition info from es state not from table because the partition info is generated from es cluster state dynamically + if (esTableState == null) { + throw new UserException("EsTable shard info has not been synced, wait some time or check log"); + } Collection partitionIds = partitionPrune(esTableState.getPartitionInfo()); List selectedIndex = Lists.newArrayList(); ArrayList unPartitionedIndices = Lists.newArrayList();