From af004abcf40a5792bf974e33c273fe869732e3ea Mon Sep 17 00:00:00 2001 From: yiguolei Date: Tue, 19 Mar 2019 18:47:53 +0800 Subject: [PATCH 1/8] Add EsTableDescriptor in be --- be/src/runtime/descriptors.cpp | 16 ++++++++++++++++ be/src/runtime/descriptors.h | 8 ++++++++ .../org/apache/doris/planner/EsScanNode.java | 3 +++ 3 files changed, 27 insertions(+) diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 7304318bfc8b8a..1b0b6c0a7dc25f 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -152,6 +152,19 @@ std::string BrokerTableDescriptor::debug_string() const { return out.str(); } +EsTableDescriptor::EsTableDescriptor(const TTableDescriptor& tdesc) + : TableDescriptor(tdesc) { +} + +EsTableDescriptor::~EsTableDescriptor() { +} + +std::string EsTableDescriptor::debug_string() const { + std::stringstream out; + out << "EsTable(" << TableDescriptor::debug_string() << ")"; + return out.str(); +} + KuduTableDescriptor::KuduTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc), table_name_(tdesc.kuduTable.table_name), @@ -483,6 +496,9 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb case TTableType::BROKER_TABLE: desc = pool->add(new BrokerTableDescriptor(tdesc)); break; + case TTableType::ES_TABLE: + desc = pool->add(new EsTableDescriptor(tdesc)); + break; default: DCHECK(false) << "invalid table type: " << tdesc.tableType; } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 7dd41b6a133924..15219e09ec2944 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -241,6 +241,14 @@ public : private : }; +class EsTableDescriptor : public TableDescriptor { +public : + EsTableDescriptor(const TTableDescriptor& tdesc); + virtual ~EsTableDescriptor(); + virtual std::string debug_string() const; +private : +}; + // Descriptor for a KuduTable class KuduTableDescriptor : public TableDescriptor { public: diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java index 6eb6eae2ceae52..790b976ebb0d44 100644 --- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -156,6 +156,9 @@ private void assignBackends() throws UserException { // TODO (ygl) should not get all shards, prune unrelated shard private List getShardLocations() throws UserException { // has to get partition info from es state not from table because the partition info is generated from es cluster state dynamically + if (esTableState == null) { + throw new UserException("EsTable shard info has not been synced, wait some time or check log"); + } Collection partitionIds = partitionPrune(esTableState.getPartitionInfo()); List selectedIndex = Lists.newArrayList(); ArrayList unPartitionedIndices = Lists.newArrayList(); From 1175b939a85afe36be07bc65ff1eb1aba61c4d5f Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 21 Mar 2019 12:30:01 +0800 Subject: [PATCH 2/8] Close pushdown conjunct cxts when close --- be/src/exec/es_scan_node.cpp | 5 +++-- be/src/exec/es_scan_node.h | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp index b4ae0a8e5a499c..17954ccd918070 100644 --- a/be/src/exec/es_scan_node.cpp +++ b/be/src/exec/es_scan_node.cpp @@ -186,6 +186,7 @@ Status EsScanNode::open(RuntimeState* state) { for (int i = predicate_to_conjunct.size() - 1; i >= 0; i--) { int conjunct_index = predicate_to_conjunct[i]; if (conjunct_accepted_times[conjunct_index] == _scan_ranges.size()) { + _pushdown_conjunct_ctxs.push_back(*(_conjunct_ctxs.begin() + conjunct_index)); _conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index); } } @@ -259,7 +260,8 @@ Status EsScanNode::close(RuntimeState* state) { VLOG(1) << "EsScanNode::Close"; RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - + Expr::close(_pushdown_conjunct_ctxs, state); + RETURN_IF_ERROR(ExecNode::close(state)); for (int i = 0; i < _addresses.size(); ++i) { TExtCloseParams params; params.__set_scan_handle(_scan_handles[i]); @@ -307,7 +309,6 @@ Status EsScanNode::close(RuntimeState* state) { #endif } - RETURN_IF_ERROR(ExecNode::close(state)); return Status::OK; } diff --git a/be/src/exec/es_scan_node.h b/be/src/exec/es_scan_node.h index 64c3abc0edb57c..38071f4f27dce1 100644 --- a/be/src/exec/es_scan_node.h +++ b/be/src/exec/es_scan_node.h @@ -83,6 +83,7 @@ class EsScanNode : public ScanNode { std::vector _addresses; std::vector _scan_handles; std::vector _offsets; + std::vector _pushdown_conjunct_ctxs; }; } From 59178288f5d4e09db1a0dc9db800351318f45c68 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 21 Mar 2019 16:44:46 +0800 Subject: [PATCH 3/8] Get string value from expr error --- be/src/exec/es_scan_node.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp index 17954ccd918070..eac7eab0647f4b 100644 --- a/be/src/exec/es_scan_node.cpp +++ b/be/src/exec/es_scan_node.cpp @@ -578,7 +578,7 @@ bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* l case TExprNodeType::STRING_LITERAL: { TStringLiteral string_literal; void* value = context->get_value(expr, NULL); - string_literal.__set_value(*reinterpret_cast(value)); + string_literal.__set_value((reinterpret_cast(value))->debug_string()); literal->__set_string_literal(string_literal); return true; } From f57992250f03af7527d8216244ae32bd5fa184d9 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 21 Mar 2019 17:34:04 +0800 Subject: [PATCH 4/8] set int value type according to int8 or int16 or int32 --- be/src/exec/es_scan_node.cpp | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp index eac7eab0647f4b..4b94a328ae9ef0 100644 --- a/be/src/exec/es_scan_node.cpp +++ b/be/src/exec/es_scan_node.cpp @@ -571,7 +571,24 @@ bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* l case TExprNodeType::INT_LITERAL: { TIntLiteral int_literal; void* value = context->get_value(expr, NULL); - int_literal.__set_value(*reinterpret_cast(value)); + int32_t int_val = 0; + switch (expr->type().type) { + case TYPE_TINYINT: { + int_val = *reinterpret_cast(value); + break; + } + case TYPE_SMALLINT: { + int_val = *reinterpret_cast(value); + break; + } + case TYPE_INT: { + int_val = *reinterpret_cast(value); + break; + } + default: + return false; + } + int_literal.__set_value(int_val); literal->__set_int_literal(int_literal); return true; } From df8470fd1ad3910492f135b2a6bc308ba03f78ba Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 21 Mar 2019 19:04:45 +0800 Subject: [PATCH 5/8] Convert double to float when receive double literal --- be/src/exec/es_scan_node.cpp | 13 +++++++++- .../org/apache/doris/catalog/EsTable.java | 25 ++++++++----------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp index 4b94a328ae9ef0..b29a64bd020d78 100644 --- a/be/src/exec/es_scan_node.cpp +++ b/be/src/exec/es_scan_node.cpp @@ -564,7 +564,18 @@ bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* l case TExprNodeType::FLOAT_LITERAL: { TFloatLiteral float_literal; void* value = context->get_value(expr, NULL); - float_literal.__set_value(*reinterpret_cast(value)); + switch (expr->type().type) { + case TYPE_FLOAT: { + float_literal.__set_value(*reinterpret_cast(value)); + break; + } + case TYPE_DOUBLE: { + float_literal.__set_value(*((double *)value)); + break; + } + default: + return false; + } literal->__set_float_literal(float_literal); return true; } diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java index a0d930916411a9..fcf95050de41c2 100644 --- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java @@ -50,10 +50,10 @@ public class EsTable extends Table { private String hosts; private String[] seeds; - private String userName; - private String passwd; + private String userName = ""; + private String passwd = ""; private String indexName; - private String mappingType = "doc"; + private String mappingType = "_doc"; // only save the partition definition, save the partition key, // partition list is got from es cluster dynamically and is saved in esTableState private PartitionInfo partitionInfo; @@ -77,7 +77,7 @@ private void validate(Map properties) throws DdlException { + "they are: hosts, user, password, index"); } - hosts = properties.get(HOSTS); + hosts = properties.get(HOSTS).trim(); if (Strings.isNullOrEmpty(hosts)) { throw new DdlException("Hosts of ES table is null. " + "Please add properties('hosts'='xxx.xxx.xxx.xxx,xxx.xxx.xxx.xxx') when create table"); @@ -85,25 +85,22 @@ private void validate(Map properties) throws DdlException { seeds = hosts.split(","); // TODO(ygl) validate the seeds? - userName = properties.get(USER); - if (Strings.isNullOrEmpty(userName)) { - userName = ""; + if (!Strings.isNullOrEmpty(properties.get(USER).trim())) { + userName = properties.get(USER).trim(); } - passwd = properties.get(PASSWORD); - if (passwd == null) { - passwd = ""; + if (!Strings.isNullOrEmpty(properties.get(PASSWORD).trim())) { + passwd = properties.get(PASSWORD).trim(); } - indexName = properties.get(INDEX); + indexName = properties.get(INDEX).trim(); if (Strings.isNullOrEmpty(indexName)) { throw new DdlException("Index of ES table is null. " + "Please add properties('index'='xxxx') when create table"); } - mappingType = properties.get(TYPE); - if (Strings.isNullOrEmpty(mappingType)) { - mappingType = "docs"; + if (!Strings.isNullOrEmpty(properties.get(TYPE).trim())) { + mappingType = properties.get(TYPE).trim(); } } From 9d40f8332889d9ba6117ec771222dab0cae84961 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Fri, 22 Mar 2019 10:56:30 +0800 Subject: [PATCH 6/8] Change from unix time param from int to int64 --- be/src/runtime/datetime_value.cpp | 2 +- be/src/runtime/datetime_value.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/datetime_value.cpp b/be/src/runtime/datetime_value.cpp index d3e4f0e50be035..b45e9bc578afb0 100644 --- a/be/src/runtime/datetime_value.cpp +++ b/be/src/runtime/datetime_value.cpp @@ -1541,7 +1541,7 @@ int DateTimeValue::unix_timestamp() const { return seconds; } -bool DateTimeValue::from_unixtime(int seconds) { +bool DateTimeValue::from_unixtime(int64_t seconds) { if (seconds < 0) { return false; } diff --git a/be/src/runtime/datetime_value.h b/be/src/runtime/datetime_value.h index 9994b3d26bff98..3ae5c5af4ad50b 100644 --- a/be/src/runtime/datetime_value.h +++ b/be/src/runtime/datetime_value.h @@ -334,7 +334,7 @@ class DateTimeValue { int unix_timestamp() const; - bool from_unixtime(int); + bool from_unixtime(int64_t); bool operator==(const DateTimeValue& other) const { // NOTE: This is not same with MySQL. From 5333f3f300fb60c090c0b08698f238783cfa1767 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Fri, 22 Mar 2019 13:04:17 +0800 Subject: [PATCH 7/8] Fix bug when date's second is zero, the output value is fault --- be/src/exec/es_scan_node.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp index b29a64bd020d78..f0fb17e2f32bfd 100644 --- a/be/src/exec/es_scan_node.cpp +++ b/be/src/exec/es_scan_node.cpp @@ -582,7 +582,7 @@ bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* l case TExprNodeType::INT_LITERAL: { TIntLiteral int_literal; void* value = context->get_value(expr, NULL); - int32_t int_val = 0; + int64_t int_val = 0; switch (expr->type().type) { case TYPE_TINYINT: { int_val = *reinterpret_cast(value); @@ -596,6 +596,10 @@ bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* l int_val = *reinterpret_cast(value); break; } + case TYPE_BIGINT: { + int_val = *reinterpret_cast(value); + break; + } default: return false; } @@ -812,6 +816,7 @@ Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple, !reinterpret_cast(slot)->from_unixtime(col.long_vals[val_idx])) { return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATETIME")); } + reinterpret_cast(slot)->set_type(TIME_DATETIME); break; } case TYPE_DECIMAL: { From 0ea552d6dc0f17ef40419fe9c7098576017f8a69 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Fri, 22 Mar 2019 18:37:40 +0800 Subject: [PATCH 8/8] Fix bug: index name maybe null --- .../org/apache/doris/catalog/EsTable.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java index fcf95050de41c2..fdcb7c843adfc5 100644 --- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java @@ -77,29 +77,33 @@ private void validate(Map properties) throws DdlException { + "they are: hosts, user, password, index"); } - hosts = properties.get(HOSTS).trim(); - if (Strings.isNullOrEmpty(hosts)) { + if (Strings.isNullOrEmpty(properties.get(HOSTS)) + || Strings.isNullOrEmpty(properties.get(HOSTS).trim())) { throw new DdlException("Hosts of ES table is null. " + "Please add properties('hosts'='xxx.xxx.xxx.xxx,xxx.xxx.xxx.xxx') when create table"); } + hosts = properties.get(HOSTS).trim(); seeds = hosts.split(","); - // TODO(ygl) validate the seeds? - if (!Strings.isNullOrEmpty(properties.get(USER).trim())) { + if (!Strings.isNullOrEmpty(properties.get(USER)) + && !Strings.isNullOrEmpty(properties.get(USER).trim())) { userName = properties.get(USER).trim(); } - if (!Strings.isNullOrEmpty(properties.get(PASSWORD).trim())) { + if (!Strings.isNullOrEmpty(properties.get(PASSWORD)) + && !Strings.isNullOrEmpty(properties.get(PASSWORD).trim())) { passwd = properties.get(PASSWORD).trim(); } - indexName = properties.get(INDEX).trim(); - if (Strings.isNullOrEmpty(indexName)) { + if (Strings.isNullOrEmpty(properties.get(INDEX)) + || Strings.isNullOrEmpty(properties.get(INDEX).trim())) { throw new DdlException("Index of ES table is null. " + "Please add properties('index'='xxxx') when create table"); } + indexName = properties.get(INDEX).trim(); - if (!Strings.isNullOrEmpty(properties.get(TYPE).trim())) { + if (!Strings.isNullOrEmpty(properties.get(TYPE)) + && !Strings.isNullOrEmpty(properties.get(TYPE).trim())) { mappingType = properties.get(TYPE).trim(); } }