diff --git a/be/src/common/config.h b/be/src/common/config.h index 1538638502289c..fbb07ca95671f4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -549,6 +549,15 @@ namespace config { // Soft memory limit as a fraction of hard memory limit. CONF_Double(soft_mem_limit_frac, "0.9"); + + // Set max cache's size of query results, the unit is M byte + CONF_Int32(cache_max_size, "256"); + + //Cache memory is pruened when reach cache_max_size + cache_elasticity_size + CONF_Int32(cache_elasticity_size, "128"); + + //Maximum number of cache partitions corresponding to a SQL + CONF_Int32(cache_max_partition_count, "1024"); } // namespace config } // namespace doris diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index e2ea260163ef12..0176d74c412976 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -106,6 +106,8 @@ set(RUNTIME_FILES mysql_result_writer.cpp memory/system_allocator.cpp memory/chunk_allocator.cpp + cache/result_node.cpp + cache/result_cache.cpp ) if (WITH_MYSQL) diff --git a/be/src/runtime/cache/cache_utils.h b/be/src/runtime/cache/cache_utils.h new file mode 100644 index 00000000000000..192289f9b1e0b8 --- /dev/null +++ b/be/src/runtime/cache/cache_utils.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_RUNTIME_CACHE_UTILS_H +#define DORIS_BE_SRC_RUNTIME_CACHE_UTILS_H + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace doris { + +typedef boost::shared_lock CacheReadLock; +typedef boost::unique_lock CacheWriteLock; + +//#ifndef PARTITION_CACHE_DEV +//#define PARTITION_CACHE_DEV +//#endif + +struct CacheStat { + static const uint32 DAY_SECONDS = 86400; + long cache_time; + long last_update_time; + long last_read_time; + uint32 read_count; + CacheStat() { init(); } + + inline long cache_time_second() { + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec; + } + + void init() { + cache_time = 0; + last_update_time = 0; + last_read_time = 0; + read_count = 0; + } + + void update() { + last_update_time = cache_time_second(); + if (cache_time == 0) { + cache_time = last_update_time; + } + last_read_time = last_update_time; + read_count++; + } + + void query() { + last_read_time = cache_time_second(); + read_count++; + } + + double last_query_day() { return (cache_time_second() - last_read_time) * 1.0 / DAY_SECONDS; } + + double avg_query_count() { + return read_count * DAY_SECONDS * 1.0 / (cache_time_second() - last_read_time + 1); + } +}; + +} // namespace doris +#endif diff --git a/be/src/runtime/cache/result_cache.cpp b/be/src/runtime/cache/result_cache.cpp new file mode 100644 index 00000000000000..ca4b53f747e996 --- /dev/null +++ b/be/src/runtime/cache/result_cache.cpp @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "gen_cpp/internal_service.pb.h" +#include "runtime/cache/result_cache.h" +#include "util/doris_metrics.h" + +namespace doris { + +/* +* Remove the tail node of link +*/ +ResultNode* ResultNodeList::pop() { + remove(_head); + return _head; +} + +void ResultNodeList::remove(ResultNode* node) { + if (!node) return; + if (node == _head) _head = node->get_next(); + if (node == _tail) _tail = node->get_prev(); + node->unlink(); + _node_count--; +} + +void ResultNodeList::push(ResultNode* node) { + if (!node) return; + if (!_head) _head = node; + node->append(_tail); + _tail = node; + _node_count++; +} + +void ResultNodeList::move_tail(ResultNode* node) { + if (!node || node == _tail) return; + if (!_head) + _head = node; + else if (node == _head) + _head = node->get_next(); + node->unlink(); + node->append(_tail); + _tail = node; +} + +void ResultNodeList::clear() { + LOG(INFO) << "clear result node list."; + while (_head) { + ResultNode* tmp_node = _head->get_next(); + _head->clear(); + SAFE_DELETE(_head); + _head = tmp_node; + } + _node_count = 0; +} + +void ResultCache::update(const PUpdateCacheRequest* request, PCacheResponse* response) { + ResultNode* node; + PCacheStatus status; + bool update_first = false; + UniqueId sql_key = request->sql_key(); + LOG(INFO) << "update cache, sql key:" << sql_key; + + CacheWriteLock write_lock(_cache_mtx); + auto it = _node_map.find(sql_key); + if (it != _node_map.end()) { + node = it->second; + _cache_size -= node->get_data_size(); + _partition_count -= node->get_partition_count(); + status = node->update_partition(request, update_first); + } else { + node = _node_list.new_node(sql_key); + status = node->update_partition(request, update_first); + _node_list.push(node); + _node_map[sql_key] = node; + _node_count += 1; + } + if (update_first) { + _node_list.move_tail(node); + } + _cache_size += node->get_data_size(); + _partition_count += node->get_partition_count(); + response->set_status(status); + + prune(); + update_monitor(); +} + +void ResultCache::fetch(const PFetchCacheRequest* request, PFetchCacheResult* result) { + bool hit_first = false; + ResultNodeMap::iterator node_it; + const UniqueId sql_key = request->sql_key(); + LOG(INFO) << "fetch cache, sql key:" << sql_key; + { + CacheReadLock read_lock(_cache_mtx); + node_it = _node_map.find(sql_key); + if (node_it == _node_map.end()) { + result->set_status(PCacheStatus::NO_SQL_KEY); + LOG(INFO) << "no such sql key:" << sql_key; + return; + } + ResultNode* node = node_it->second; + PartitionRowBatchList part_rowbatch_list; + PCacheStatus status = node->fetch_partition(request, part_rowbatch_list, hit_first); + + for (auto part_it = part_rowbatch_list.begin(); part_it != part_rowbatch_list.end(); part_it++) { + PCacheValue* srcValue = (*part_it)->get_value(); + if (srcValue != NULL) { + PCacheValue* value = result->add_value(); + value->CopyFrom(*srcValue); + LOG(INFO) << "fetch cache partition key:" << srcValue->param().partition_key(); + } else { + LOG(WARNING) << "prowbatch of cache is null"; + status = PCacheStatus::EMPTY_DATA; + break; + } + } + result->set_status(status); + } + + if (hit_first) { + { + CacheWriteLock write_lock(_cache_mtx); + _node_list.move_tail(node_it->second); + } + } +} + +bool ResultCache::contains(const UniqueId& sql_key) { + CacheReadLock read_lock(_cache_mtx); + return _node_map.find(sql_key) != _node_map.end(); +} + +void ResultCache::clear(const PClearCacheRequest* request, PCacheResponse* response) { + LOG(INFO) << "clear cache type" << request->clear_type() + << ", node size:" << _node_list.get_node_count() << ", map size:" << _node_map.size(); + CacheWriteLock write_lock(_cache_mtx); + //0 clear, 1 prune, 2 before_time,3 sql_key + switch (request->clear_type()) { + case 0: + _node_list.clear(); + _node_map.clear(); + _cache_size = 0; + _node_count = 0; + _partition_count = 0; + case 1: + prune(); + default: + break; + } + update_monitor(); + response->set_status(PCacheStatus::CACHE_OK); +} + +//private method +ResultNode* find_min_time_node(ResultNode* result_node) { + if (result_node->get_prev()) { + if (result_node->get_prev()->first_partition_last_time() <= + result_node->first_partition_last_time()) { + return result_node->get_prev(); + } + } + + if (result_node->get_next()) { + if (result_node->get_next()->first_partition_last_time() < + result_node->first_partition_last_time()) { + return result_node->get_next(); + } + } + return result_node; +} + +/* +* Two-dimensional array, prune the min last_read_time PartitionRowBatch. +* The following example is the last read time array. +* 1 and 2 is the read time, nodes with pruning read time < 3 +* Before: +* 1,2 //_head ResultNode* +* 1,2,3,4,5 +* 2,4,3,6,8 +* 5,7,9,11,13 //_tail ResultNode* +* After: +* 4,5 //_head +* 4,3,6,8 +* 5,7,9,11,13 //_tail +*/ +void ResultCache::prune() { + if (_cache_size <= (_max_size + _elasticity_size)) { + return; + } + LOG(INFO) << "begin prune cache, cache_size : " << _cache_size << ", max_size : " << _max_size + << ", elasticity_size : " << _elasticity_size; + ResultNode* result_node = _node_list.get_head(); + while (_cache_size > _max_size) { + if (result_node == NULL) { + break; + } + result_node = find_min_time_node(result_node); + _cache_size -= result_node->prune_first(); + if (result_node->get_data_size() == 0) { + ResultNode* next_node; + if (result_node->get_next()) { + next_node = result_node->get_next(); + } else if (result_node->get_prev()) { + next_node = result_node->get_prev(); + } else { + next_node = _node_list.get_head(); + } + remove(result_node); + result_node = next_node; + } + } + LOG(INFO) << "finish prune, cache_size : " << _cache_size; + _node_count = _node_map.size(); + _cache_size = 0; + _partition_count = 0; + for (auto node_it = _node_map.begin(); node_it != _node_map.end(); node_it++) { + _partition_count += node_it->second->get_partition_count(); + _cache_size += node_it->second->get_data_size(); + } +} + +void ResultCache::remove(ResultNode* result_node) { + auto node_it = _node_map.find(result_node->get_sql_key()); + if (node_it != _node_map.end()) { + _node_map.erase(node_it); + _node_list.remove(result_node); + _node_list.delete_node(&result_node); + } +} + +void ResultCache::update_monitor() { + DorisMetrics::instance()->cache_memory_total.set_value(_cache_size); + DorisMetrics::instance()->cache_sql_total.set_value(_node_count); + DorisMetrics::instance()->cache_partition_total.set_value(_partition_count); +} + +} // namespace doris + diff --git a/be/src/runtime/cache/result_cache.h b/be/src/runtime/cache/result_cache.h new file mode 100644 index 00000000000000..42326af651dc1b --- /dev/null +++ b/be/src/runtime/cache/result_cache.h @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_RUNTIME_RESULT_CACHE_H +#define DORIS_BE_SRC_RUNTIME_RESULT_CACHE_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "runtime/cache/cache_utils.h" +#include "runtime/cache/result_node.h" +#include "runtime/mem_pool.h" +#include "runtime/mem_tracker.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" + +namespace doris { + +typedef std::unordered_map ResultNodeMap; + +// a doubly linked list class +class ResultNodeList { +public: + ResultNodeList() : _head(NULL), _tail(NULL), _node_count(0) {} + virtual ~ResultNodeList() {} + + ResultNode* new_node(const UniqueId& sql_key) { return new ResultNode(sql_key); } + + void delete_node(ResultNode** node) { SAFE_DELETE(*node); } + + ResultNode* pop(); + void move_tail(ResultNode* node); + //Just remove node from link, do not delete node + void remove(ResultNode* node); + void push(ResultNode* node); + void clear(); + + ResultNode* get_head() const { return _head; } + + ResultNode* get_tail() const { return _tail; } + + size_t get_node_count() const { return _node_count; } + +private: + ResultNode* _head; + ResultNode* _tail; + size_t _node_count; +}; + +class ResultCache { +public: + ResultCache(int32 max_size, int32 elasticity_size) { + _max_size = max_size * 1024 * 1024; + _elasticity_size = elasticity_size * 1024 * 1024; + _cache_size = 0; + _node_count = 0; + _partition_count = 0; + } + + virtual ~ResultCache() {} + + void update(const PUpdateCacheRequest* request, PCacheResponse* response); + void fetch(const PFetchCacheRequest* request, PFetchCacheResult* result); + bool contains(const UniqueId& sql_key); + void clear(const PClearCacheRequest* request, PCacheResponse* response); + + size_t get_cache_size() { return _cache_size; } + +private: + void prune(); + void remove(ResultNode* result_node); + void update_monitor(); + + //At the same time, multithreaded reading + //Single thread updating and cleaning(only single be, Fe is not affected) + mutable boost::shared_mutex _cache_mtx; + ResultNodeMap _node_map; + //List of result nodes corresponding to SqlKey,last recently useed at the tail + ResultNodeList _node_list; + size_t _cache_size; + size_t _max_size; + double _elasticity_size; + size_t _node_count; + size_t _partition_count; + +private: + ResultCache(); + ResultCache(const ResultCache&); + const ResultCache& operator=(const ResultCache&); +}; + +} // namespace doris +#endif diff --git a/be/src/runtime/cache/result_node.cpp b/be/src/runtime/cache/result_node.cpp new file mode 100644 index 00000000000000..a11b43fbaf397f --- /dev/null +++ b/be/src/runtime/cache/result_node.cpp @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "gen_cpp/internal_service.pb.h" +#include "runtime/cache/result_node.h" +#include "runtime/cache/cache_utils.h" + +namespace doris { + +bool compare_partition(const PartitionRowBatch* left_node, const PartitionRowBatch* right_node) { + return left_node->get_partition_key() < right_node->get_partition_key(); +} + +//return new batch size,only include the size of PRowBatch +void PartitionRowBatch::set_row_batch(const PCacheValue& value) { + if (_cache_value != NULL && !check_newer(value.param())) { + LOG(WARNING) << "set old version data, cache ver:" << _cache_value->param().last_version() + << ",cache time:" << _cache_value->param().last_version_time() + << ", setdata ver:" << value.param().last_version() + << ",setdata time:" << value.param().last_version_time(); + return; + } + SAFE_DELETE(_cache_value); + _cache_value = new PCacheValue(value); + _data_size += _cache_value->data_size(); + _cache_stat.update(); + LOG(INFO) << "finish set row batch, row num:" << _cache_value->row_size() + << ", data size:" << _data_size; +} + +bool PartitionRowBatch::is_hit_cache(const PCacheParam& param) { + if (param.partition_key() != _partition_key) { + return false; + } + if (!check_match(param)) { + return false; + } + _cache_stat.query(); + return true; +} + +void PartitionRowBatch::clear() { + LOG(INFO) << "clear partition rowbatch."; + SAFE_DELETE(_cache_value); + _partition_key = 0; + _data_size = 0; + _cache_stat.init(); +} + +PCacheStatus ResultNode::update_partition(const PUpdateCacheRequest* request, bool& update_first) { + update_first = false; + if (_sql_key != request->sql_key()) { + LOG(INFO) << "no match sql_key " << request->sql_key().hi() << request->sql_key().lo(); + return PCacheStatus::PARAM_ERROR; + } + + if (request->value_size() > config::cache_max_partition_count) { + LOG(WARNING) << "too many partitions size:" << request->value_size(); + return PCacheStatus::PARAM_ERROR; + } + + //Only one thread per SQL key can update the cache + CacheWriteLock write_lock(_node_mtx); + + int64 first_key = kint64max; + if (_partition_list.size() == 0) { + update_first = true; + } else { + first_key = (*(_partition_list.begin()))->get_partition_key(); + } + PartitionRowBatch* partition = NULL; + for (int i = 0; i < request->value_size(); i++) { + const PCacheValue& value = request->value(i); + int64 partition_key = value.param().partition_key(); + if (!update_first && partition_key <= first_key) { + update_first = true; + } + auto it = _partition_map.find(partition_key); + if (it == _partition_map.end()) { + partition = new PartitionRowBatch(partition_key); + partition->set_row_batch(value); + _partition_map[partition_key] = partition; + _partition_list.push_back(partition); +#ifdef PARTITION_CACHE_DEV + LOG(INFO) << "add index:" << i << ", pkey:" << partition->get_partition_key() + << ", list size:" << _partition_list.size() + << ", map size:" << _partition_map.size(); +#endif + } else { + partition = it->second; + _data_size -= partition->get_data_size(); + partition->set_row_batch(value); +#ifdef PARTITION_CACHE_DEV + LOG(INFO) << "update index:" << i << ", pkey:" << partition->get_partition_key() + << ", list size:" << _partition_list.size() + << ", map size:" << _partition_map.size(); +#endif + } + _data_size += partition->get_data_size(); + } + _partition_list.sort(compare_partition); + LOG(INFO) << "finish update batches:" << _partition_list.size(); + while (config::cache_max_partition_count > 0 && + _partition_list.size() > config::cache_max_partition_count) { + if (prune_first() == 0) { + break; + } + } + return PCacheStatus::CACHE_OK; +} + +/** +* Only the range query of the key of the partition is supported, and the separated partition key query is not supported. +* Because a query can only be divided into two parts, part1 get data from cache, part2 fetch_data by scan node from BE. +* Partion cache : 20191211-20191215 +* Hit cache parameter : [20191211 - 20191215], [20191212 - 20191214], [20191212 - 20191216],[20191210 - 20191215] +* Miss cache parameter: [20191210 - 20191216] +*/ +PCacheStatus ResultNode::fetch_partition(const PFetchCacheRequest* request, + PartitionRowBatchList& row_batch_list, bool& hit_first) { + hit_first = false; + if (request->param_size() == 0) { + return PCacheStatus::PARAM_ERROR; + } + + CacheReadLock read_lock(_node_mtx); + + if (_partition_list.size() == 0) { + return PCacheStatus::NO_PARTITION_KEY; + } + + if (request->param(0).partition_key() > (*_partition_list.rbegin())->get_partition_key() || + request->param(request->param_size() - 1).partition_key() < + (*_partition_list.begin())->get_partition_key()) { + return PCacheStatus::NO_PARTITION_KEY; + } + + bool find = false; + int begin_idx = -1, end_idx = -1, param_idx = 0; + auto begin_it = _partition_list.end(); + auto end_it = _partition_list.end(); + auto part_it = _partition_list.begin(); + + PCacheStatus status = PCacheStatus::CACHE_OK; + while (param_idx < request->param_size() && part_it != _partition_list.end()) { +#ifdef PARTITION_CACHE_DEV + LOG(INFO) << "Param index : " << param_idx + << ", param part Key : " << request->param(param_idx).partition_key() + << ", batch part key : " << (*part_it)->get_partition_key(); +#endif + if (!find) { + while (part_it != _partition_list.end() && + request->param(param_idx).partition_key() > (*part_it)->get_partition_key()) { + part_it++; + } + while (param_idx < request->param_size() && + request->param(param_idx).partition_key() < (*part_it)->get_partition_key()) { + param_idx++; + } + if (request->param(param_idx).partition_key() == (*part_it)->get_partition_key()) { + find = true; + } + } + if (find) { +#ifdef PARTITION_CACHE_DEV + LOG(INFO) << "Find! Param index : " << param_idx + << ", param part Key : " << request->param(param_idx).partition_key() + << ", batch part key : " << (*part_it)->get_partition_key() + << ", param part version : " << request->param(param_idx).last_version() + << ", batch part version : " << (*part_it)->get_value()->param().last_version() + << ", param part version time : " << request->param(param_idx).last_version_time() + << ", batch part version time : " << (*part_it)->get_value()->param().last_version_time(); +#endif + if ((*part_it)->is_hit_cache(request->param(param_idx))) { + if (begin_idx < 0) { + begin_idx = param_idx; + begin_it = part_it; + } + end_idx = param_idx; + end_it = part_it; + param_idx++; + part_it++; + } else { + status = PCacheStatus::DATA_OVERDUE; + break; + } + } + } + + if (begin_it == _partition_list.end() && end_it == _partition_list.end()) { + return status; + } + + //[20191210 - 20191216] hit partition range [20191212-20191214],the sql will be splited to 3 part! + if (begin_idx != 0 && end_idx != request->param_size() - 1) { + return PCacheStatus::INVALID_KEY_RANGE; + } + if (begin_it == _partition_list.begin()) { + hit_first = true; + } + + while (true) { + row_batch_list.push_back(*begin_it); + if (begin_it == end_it) { + break; + } + begin_it++; + } + return status; +} + +/* +* prune first partition result +*/ +size_t ResultNode::prune_first() { + if (_partition_list.size() == 0) { + return 0; + } + PartitionRowBatch* part_node = *_partition_list.begin(); + size_t prune_size = part_node->get_data_size(); + _partition_list.erase(_partition_list.begin()); + SAFE_DELETE(part_node); + _data_size -= prune_size; + return prune_size; +} + +void ResultNode::clear() { + CacheWriteLock write_lock(_node_mtx); + LOG(INFO) << "clear result node:" << _sql_key; + _sql_key.hi = 0; + _sql_key.lo = 0; + for (auto it = _partition_list.begin(); it != _partition_list.end();) { + (*it)->clear(); + delete *it; + it = _partition_list.erase(it); + } + _data_size = 0; +} + +void ResultNode::append(ResultNode* tail) { + _prev = tail; + if (tail) tail->set_next(this); +} + +void ResultNode::unlink() { + if (_next) { + _next->set_prev(_prev); + } + if (_prev) { + _prev->set_next(_next); + } + _next = NULL; + _prev = NULL; +} + +} // namespace doris + diff --git a/be/src/runtime/cache/result_node.h b/be/src/runtime/cache/result_node.h new file mode 100644 index 00000000000000..d8fd78fafc6929 --- /dev/null +++ b/be/src/runtime/cache/result_node.h @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_RUNTIME_RESULT_NODE_H +#define DORIS_BE_SRC_RUNTIME_RESULT_NODE_H + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "olap/olap_define.h" +#include "runtime/cache/cache_utils.h" +#include "runtime/mem_pool.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" +#include "util/uid_util.h" + +namespace doris { + +enum PCacheStatus; +class PCacheParam; +class PCacheValue; +class PCacheResponse; +class PFetchCacheRequest; +class PFetchCacheResult; +class PUpdateCacheRequest; +class PClearCacheRequest; + +/* +* Cache one partition data +*/ +class PartitionRowBatch { +public: + PartitionRowBatch(int64 partition_key) + : _partition_key(partition_key), _cache_value(NULL), _data_size(0) {} + + ~PartitionRowBatch() {} + + void set_row_batch(const PCacheValue& value); + bool is_hit_cache(const PCacheParam& param); + void clear(); + + int64 get_partition_key() const { return _partition_key; } + + PCacheValue* get_value() { return _cache_value; } + + size_t get_data_size() { return _data_size; } + + const CacheStat* get_stat() const { return &_cache_stat; } + +private: + bool check_match(const PCacheParam& req_param) { + if (req_param.last_version() > _cache_value->param().last_version()) { + return false; + } + if (req_param.last_version_time() > _cache_value->param().last_version_time()) { + return false; + } + return true; + } + + bool check_newer(const PCacheParam& up_param) { + //for init data of sql cache + if (up_param.last_version() == 0 || up_param.last_version_time() == 0) { + return true; + } + if (up_param.last_version_time() > _cache_value->param().last_version_time()) { + return true; + } + if (up_param.last_version() > _cache_value->param().last_version()) { + return true; + } + return false; + } + +private: + int64 _partition_key; + PCacheValue* _cache_value; + size_t _data_size; + CacheStat _cache_stat; +}; + +typedef std::list PartitionRowBatchList; +typedef boost::unordered_map PartitionRowBatchMap; + +/* +* Cache the result of one SQL,include many partition rowsets +*/ +class ResultNode { +public: + ResultNode() : _sql_key(0, 0), _prev(NULL), _next(NULL), _data_size(0) {} + + ResultNode(const UniqueId& sql_key) + : _sql_key(sql_key), _prev(NULL), _next(NULL), _data_size(0) {} + + virtual ~ResultNode() {} + + // void init() { + // clear(); + // } + + PCacheStatus update_partition(const PUpdateCacheRequest* request, bool& update_first); + PCacheStatus fetch_partition(const PFetchCacheRequest* request, + PartitionRowBatchList& rowBatchList, bool& hit_first); + + size_t prune_first(); + void clear(); + + bool operator()(const ResultNode* left_node, const ResultNode* right_node) { + if (left_node->get_partition_count() == 0) { + return true; + } + if (right_node->get_partition_count() == 0) { + return false; + } + return left_node->get_first_stat()->last_read_time < + right_node->get_first_stat()->last_read_time; + } + + ResultNode* get_prev() { return _prev; } + + void set_prev(ResultNode* prev) { _prev = prev; } + + ResultNode* get_next() { return _next; } + + void set_next(ResultNode* next) { _next = next; } + + void append(ResultNode* tail); + + void unlink(); + + size_t get_partition_count() const { return _partition_list.size(); } + + size_t get_data_size() const { return _data_size; } + + UniqueId get_sql_key() { return _sql_key; } + + bool sql_key_null() { return _sql_key.hi == 0 && _sql_key.lo == 0; } + + void set_sql_key(const UniqueId& sql_key) { _sql_key = sql_key; } + + long first_partition_last_time() const { + if (_partition_list.size() == 0) { + return 0; + } + const PartitionRowBatch* first = *(_partition_list.begin()); + return first->get_stat()->last_read_time; + } + + const CacheStat* get_first_stat() const { + if (_partition_list.size() == 0) { + return NULL; + } + return (*(_partition_list.begin()))->get_stat(); + } + + const CacheStat* get_last_stat() const { + if (_partition_list.size() == 0) { + return NULL; + } + return (*(_partition_list.end()--))->get_stat(); + } + +private: + mutable boost::shared_mutex _node_mtx; + UniqueId _sql_key; + ResultNode* _prev; + ResultNode* _next; + size_t _data_size; + PartitionRowBatchList _partition_list; + PartitionRowBatchMap _partition_map; +}; + +} // namespace doris +#endif diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index d22150da47b472..6bd8a809261817 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -21,7 +21,8 @@ namespace doris { -ExecEnv::ExecEnv() {} +ExecEnv::ExecEnv() : _is_init(false) { +} ExecEnv::~ExecEnv() {} diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index a880419921d658..1f973489e19dc7 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -34,6 +34,7 @@ class EtlJobMgr; class EvHttpServer; class ExternalScanContextMgr; class FragmentMgr; +class ResultCache; class LoadPathMgr; class LoadStreamMgr; class MemTracker; @@ -55,6 +56,7 @@ class SmallFileMgr; class FileBlockManager; class PluginMgr; + class BackendServiceClient; class FrontendServiceClient; class TPaloBrokerServiceClient; @@ -113,6 +115,7 @@ class ExecEnv { PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; } CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; } FragmentMgr* fragment_mgr() { return _fragment_mgr; } + ResultCache* result_cache() { return _result_cache; } TMasterInfo* master_info() { return _master_info; } EtlJobMgr* etl_job_mgr() { return _etl_job_mgr; } LoadPathMgr* load_path_mgr() { return _load_path_mgr; } @@ -147,6 +150,7 @@ class ExecEnv { void _init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit); private: + bool _is_init; std::vector _store_paths; // Leave protected so that subclasses can override ExternalScanContextMgr* _external_scan_context_mgr = nullptr; @@ -164,6 +168,7 @@ class ExecEnv { PriorityThreadPool* _etl_thread_pool = nullptr; CgroupsMgr* _cgroups_mgr = nullptr; FragmentMgr* _fragment_mgr = nullptr; + ResultCache* _result_cache = nullptr; TMasterInfo* _master_info = nullptr; EtlJobMgr* _etl_job_mgr = nullptr; LoadPathMgr* _load_path_mgr = nullptr; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 68dfdfd63813ad..99395a36fb900a 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -19,6 +19,7 @@ #include "common/config.h" #include "gen_cpp/BackendService.h" +#include "gen_cpp/internal_service.pb.h" #include "runtime/exec_env.h" #include "runtime/data_stream_mgr.h" #include "runtime/fragment_mgr.h" @@ -222,6 +223,32 @@ void PInternalServiceImpl::get_info( Status::OK().to_protobuf(response->mutable_status()); } +template +void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, + const PUpdateCacheRequest* request, + PCacheResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->update(request, response); +} + +template +void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, + const PFetchCacheRequest* request, + PFetchCacheResult* result, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->fetch(request, result); +} + +template +void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, + const PClearCacheRequest* request, + PCacheResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->clear(request, response); +} template class PInternalServiceImpl; template class PInternalServiceImpl; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index e27c2c3a4015f5..03fbcfedea99dd 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -21,6 +21,7 @@ #include "gen_cpp/internal_service.pb.h" #include "gen_cpp/palo_internal_service.pb.h" #include "util/priority_thread_pool.hpp" +#include "runtime/cache/result_cache.h" namespace brpc { class Controller; @@ -86,6 +87,20 @@ class PInternalServiceImpl : public T { PProxyResult* response, google::protobuf::Closure* done) override; + void update_cache(google::protobuf::RpcController* controller, + const PUpdateCacheRequest* request, + PCacheResponse* response, + google::protobuf::Closure* done) override; + + void fetch_cache(google::protobuf::RpcController* controller, + const PFetchCacheRequest* request, + PFetchCacheResult* result, + google::protobuf::Closure* done) override; + + void clear_cache(google::protobuf::RpcController* controller, + const PClearCacheRequest* request, + PCacheResponse* response, + google::protobuf::Closure* done) override; private: Status _exec_plan_fragment(brpc::Controller* cntl); private: diff --git a/be/src/util/uid_util.h b/be/src/util/uid_util.h index dc2d9383929558..c5c085906ffddd 100644 --- a/be/src/util/uid_util.h +++ b/be/src/util/uid_util.h @@ -62,6 +62,7 @@ struct UniqueId { int64_t lo = 0; UniqueId(int64_t hi_, int64_t lo_) : hi(hi_), lo(lo_) { } + UniqueId(const UniqueId& uid) : hi(uid.hi), lo(uid.lo) { } UniqueId(const TUniqueId& tuid) : hi(tuid.hi), lo(tuid.lo) { } UniqueId(const PUniqueId& puid) : hi(puid.hi()), lo(puid.lo()) { } UniqueId(const std::string& hi_str, const std::string& lo_str) { @@ -87,6 +88,32 @@ struct UniqueId { to_hex(lo, buf + 17); return {buf, 33}; } + + UniqueId& operator=(const UniqueId uid) { + hi = uid.hi; + lo = uid.lo; + return *this; + } + + UniqueId& operator=(const PUniqueId puid) { + hi = puid.hi(); + lo = puid.lo(); + return *this; + } + + UniqueId& operator=(const TUniqueId tuid) { + hi = tuid.hi; + lo = tuid.lo; + return *this; + } + //compare PUniqueId and UniqueId + bool operator==(const PUniqueId& rhs) const { + return hi == rhs.hi() && lo == rhs.lo(); + } + + bool operator!=(const PUniqueId& rhs) const { + return hi != rhs.hi() || lo != rhs.lo(); + } // std::map std::set needs this operator bool operator<(const UniqueId& right) const { diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt index b360534569e026..c3bab67a2340b1 100644 --- a/be/test/runtime/CMakeLists.txt +++ b/be/test/runtime/CMakeLists.txt @@ -62,3 +62,4 @@ ADD_BE_TEST(external_scan_context_mgr_test) ADD_BE_TEST(memory/chunk_allocator_test) ADD_BE_TEST(memory/system_allocator_test) +ADD_BE_TEST(cache/partition_cache_test) diff --git a/be/test/runtime/cache/partition_cache_test.cpp b/be/test/runtime/cache/partition_cache_test.cpp new file mode 100644 index 00000000000000..5aef2a0e4591b9 --- /dev/null +++ b/be/test/runtime/cache/partition_cache_test.cpp @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include "util/logging.h" +#include "util/cpu_info.h" +#include "gen_cpp/internal_service.pb.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "runtime/cache/result_cache.h" +#include "runtime/buffer_control_block.h" + +namespace doris { + +class PartitionCacheTest : public testing::Test { +public: + PartitionCacheTest() { + + } + virtual ~PartitionCacheTest() { +// clear(); + } +protected: + virtual void SetUp() { + } + +private: + void init_default(){ + LOG(WARNING) << "init test default\n"; + init(16,4); + } + void init(int max_size, int ela_size); + void clear(); + PCacheStatus init_batch_data(int sql_num, int part_begin, int part_num); + ResultCache* _cache; + PUpdateCacheRequest* _update_request; + PCacheResponse* _update_response; + PFetchCacheRequest* _fetch_request; + PFetchCacheResult* _fetch_result; + PClearCacheRequest* _clear_request; + PCacheResponse* _clear_response; +}; + +void PartitionCacheTest::init(int max_size, int ela_size){ + LOG(WARNING) << "init test\n"; + _cache = new ResultCache(max_size, ela_size); + _update_request = new PUpdateCacheRequest(); + _update_response = new PCacheResponse(); + _fetch_request = new PFetchCacheRequest(); + _fetch_result = new PFetchCacheResult(); + _clear_request = new PClearCacheRequest(); + _clear_response = new PCacheResponse(); +} + +void PartitionCacheTest::clear(){ + _cache->clear(_clear_request, _clear_response); + SAFE_DELETE(_cache); + SAFE_DELETE(_update_request); + SAFE_DELETE(_update_response); + SAFE_DELETE(_fetch_request); + SAFE_DELETE(_fetch_result); +} + +void set_sql_key(PUniqueId* sql_key, int64 hi, int64 lo){ + sql_key->set_hi(hi); + sql_key->set_lo(lo); +} + +PCacheStatus PartitionCacheTest::init_batch_data(int sql_num, int part_begin, int part_num) { + LOG(WARNING) << "init data, sql_num:" << sql_num << ",part_num:" << part_num; + PUpdateCacheRequest* up_req = NULL; + PCacheResponse* up_res = NULL; + PCacheStatus st = PCacheStatus::DEFAULT; + for (int i = 1; i < sql_num + 1; i++) { + LOG(WARNING) << "Sql:" << i; + up_req = new PUpdateCacheRequest(); + up_res = new PCacheResponse(); + set_sql_key(up_req->mutable_sql_key(), i, i); + //partition + for (int j = part_begin; j < part_begin + part_num; j++) { + PCacheValue* value = up_req->add_value(); + value->mutable_param()->set_partition_key(j); + value->mutable_param()->set_last_version(j); + value->mutable_param()->set_last_version_time(j); + value->set_data_size(16); + value->add_row("0123456789abcdef"); //16 byte + } + _cache->update(up_req, up_res); + LOG(WARNING) << "finish update data"; + st = up_res->status(); + SAFE_DELETE(up_req); + SAFE_DELETE(up_res); + } + return st; +} + +TEST_F(PartitionCacheTest, update_data) { + init_default(); + PCacheStatus st = init_batch_data(1, 1, 1); + ASSERT_TRUE(st == PCacheStatus::CACHE_OK); + LOG(WARNING) << "clear cache"; + clear(); +} + +TEST_F(PartitionCacheTest, update_over_partition) { + init_default(); + PCacheStatus st = init_batch_data(1, 1, config::cache_max_partition_count+1); + ASSERT_TRUE(st == PCacheStatus::PARAM_ERROR); + clear(); +} + +TEST_F(PartitionCacheTest, cache_clear) { + init_default(); + init_batch_data(1, 1, 1); + _cache->clear(_clear_request, _clear_response); + ASSERT_EQ(_cache->get_cache_size(),0); +} + +TEST_F(PartitionCacheTest, fetch_simple_data) { + init_default(); + init_batch_data(1, 1, 1); + + LOG(WARNING) << "finish init\n"; + set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); + PCacheParam* p1 = _fetch_request->add_param(); + p1->set_partition_key(1); + p1->set_last_version(1); + p1->set_last_version_time(1); + LOG(WARNING) << "begin fetch\n"; + _cache->fetch(_fetch_request, _fetch_result); + LOG(WARNING) << "finish fetch1\n"; + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::CACHE_OK); + ASSERT_EQ(_fetch_result->value_size(), 1); + ASSERT_EQ(_fetch_result->value(0).row(0), "0123456789abcdef"); + + LOG(WARNING) << "finish fetch2\n"; + clear(); + LOG(WARNING) << "finish fetch3\n"; +} + +TEST_F(PartitionCacheTest, fetch_not_sqlid) { + init_default(); + init_batch_data(1, 1, 1); + + set_sql_key(_fetch_request->mutable_sql_key(), 2, 2); + PCacheParam* p1 = _fetch_request->add_param(); + p1->set_partition_key(1); + p1->set_last_version(1); + p1->set_last_version_time(1); + _cache->fetch(_fetch_request, _fetch_result); + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::NO_SQL_KEY); + + clear(); +} + +TEST_F(PartitionCacheTest, fetch_range_data) { + init_default(); + init_batch_data(1, 1, 3); + + set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); + PCacheParam* p1 = _fetch_request->add_param(); + p1->set_partition_key(2); + p1->set_last_version(2); + p1->set_last_version_time(2); + PCacheParam* p2 = _fetch_request->add_param(); + p2->set_partition_key(3); + p2->set_last_version(3); + p2->set_last_version_time(3); + _cache->fetch(_fetch_request, _fetch_result); + + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::CACHE_OK); + ASSERT_EQ(_fetch_result->value_size(), 2); + + clear(); +} + +TEST_F(PartitionCacheTest, fetch_invalid_key_range) { + init_default(); + init_batch_data(1, 2, 1); + + set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); + PCacheParam* p1 = _fetch_request->add_param(); + p1->set_partition_key(1); + p1->set_last_version(1); + p1->set_last_version_time(1); + + PCacheParam* p2 = _fetch_request->add_param(); + p2->set_partition_key(2); + p2->set_last_version(2); + p2->set_last_version_time(2); + + PCacheParam* p3 = _fetch_request->add_param(); + p3->set_partition_key(3); + p3->set_last_version(3); + p3->set_last_version_time(3); + _cache->fetch(_fetch_request, _fetch_result); + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::INVALID_KEY_RANGE); + ASSERT_EQ(_fetch_result->value_size(), 0); + clear(); +} + +TEST_F(PartitionCacheTest, fetch_data_overdue) { + init_default(); + init_batch_data(1, 1, 1); + + set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); + PCacheParam* p1 = _fetch_request->add_param(); + p1->set_partition_key(1); + //cache version is 1, request version is 2 + p1->set_last_version(2); + p1->set_last_version_time(2); + _cache->fetch(_fetch_request, _fetch_result); + + LOG(WARNING) << "fetch_data_overdue:" << _fetch_result->status(); + + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::DATA_OVERDUE); + ASSERT_EQ(_fetch_result->value_size(), 0); + + clear(); +} + +TEST_F(PartitionCacheTest, prune_data) { + init(1,1); + init_batch_data(129, 1, 1024); // 16*1024*128=2M + ASSERT_LE(_cache->get_cache_size(), 1*1024*1024); //cache_size <= 1M +} + +} + +int main(int argc, char** argv) { + std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; + if (!doris::config::init(conffile.c_str(), false)) { + fprintf(stderr, "error read config file. \n"); + return -1; + } + doris::init_glog("be-test"); + ::testing::InitGoogleTest(&argc, argv); + doris::CpuInfo::init(); + return RUN_ALL_TESTS(); +} +/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/docs/zh-CN/administrator-guide/partition_cache.md b/docs/zh-CN/administrator-guide/partition_cache.md new file mode 100644 index 00000000000000..38a0ae1799e72c --- /dev/null +++ b/docs/zh-CN/administrator-guide/partition_cache.md @@ -0,0 +1,205 @@ +# 分区缓存 + +## 需求场景 +大部分数据分析场景是写少读多,数据写入一次,多次频繁读取,比如一张报表涉及的维度和指标,数据在凌晨一次性计算好,但每天有数百甚至数千次的页面访问,因此非常适合把结果集缓存起来。在数据分析或BI应用中,存在下面的业务场景: +* **高并发场景**,Doris可以较好的支持高并发,但单台服务器无法承载太高的QPS +* **复杂图表的看板**,复杂的Dashboard或者大屏类应用,数据来自多张表,每个页面有数十个查询,虽然每个查询只有数十毫秒,但是总体查询时间会在数秒 +* **趋势分析**,给定日期范围的查询,指标按日显示,比如查询最近7天内的用户数的趋势,这类查询数据量大,查询范围广,查询时间往往需要数十秒 +* **用户重复查询**,如果产品没有防重刷机制,用户因手误或其他原因重复刷新页面,导致提交大量的重复的SQL + +以上四种场景,在应用层的解决方案,把查询结果放到Redis中,周期性的更新缓存或者用户手工刷新缓存,但是这个方案有如下问题: +* **数据不一致**,无法感知数据的更新,导致用户经常看到旧的数据 +* **命中率低**,缓存整个查询结果,如果数据实时写入,缓存频繁失效,命中率低且系统负载较重 +* **额外成本**,引入外部缓存组件,会带来系统复杂度,增加额外成本 + +## 解决方案 +本分区缓存策略可以解决上面的问题,优先保证数据一致性,在此基础上细化缓存粒度,提升命中率,因此有如下特点: +* 用户无需担心数据一致性,通过版本来控制缓存失效,缓存的数据和从BE中查询的数据是一致的 +* 没有额外的组件和成本,缓存结果存储在BE的内存中,用户可以根据需要调整缓存内存大小 +* 实现了两种缓存策略,SQLCache和PartitionCache,后者缓存粒度更细 +* 用一致性哈希解决BE节点上下线的问题,BE中的缓存算法是改进的LRU + +## SQLCache +SQLCache按SQL的签名、查询的表的分区ID、分区最新版本来存储和获取缓存。三者组合确定一个缓存数据集,任何一个变化了,如SQL有变化,如查询字段或条件不一样,或数据更新后版本变化了,会导致命中不了缓存。 + +如果多张表Join,使用最近更新的分区ID和最新的版本号,如果其中一张表更新了,会导致分区ID或版本号不一样,也一样命中不了缓存。 + +SQLCache,更适合T+1更新的场景,凌晨数据更新,首次查询从BE中获取结果放入到缓存中,后续相同查询从缓存中获取。实时更新数据也可以使用,但是可能存在命中率低的问题,可以参考如下PartitionCache。 + +## PartitionCache + +### 设计原理 +1. SQL可以并行拆分,Q = Q1 ∪ Q2 ... ∪ Qn,R= R1 ∪ R2 ... ∪ Rn,Q为查询语句,R为结果集 +2. 拆分为只读分区和可更新分区,只读分区缓存,更新分区不缓存 + +如上,查询最近7天的每天用户数,如按日期分区,数据只写当天分区,当天之外的其他分区的数据,都是固定不变的,在相同的查询SQL下,查询某个不更新分区的指标都是固定的。如下,在2020-03-09当天查询前7天的用户数,2020-03-03至2020-03-07的数据来自缓存,2020-03-08第一次查询来自分区,后续的查询来自缓存,2020-03-09因为当天在不停写入,所以来自分区。 + +因此,查询N天的数据,数据更新最近的D天,每天只是日期范围不一样相似的查询,只需要查询D个分区即可,其他部分都来自缓存,可以有效降低集群负载,减少查询时间。 + +``` +MySQL [(none)]> SELECT eventdate,count(userid) FROM testdb.appevent WHERE eventdate>="2020-03-03" AND eventdate<="2020-03-09" GROUP BY eventdate ORDER BY eventdate; ++------------+-----------------+ +| eventdate | count(`userid`) | ++------------+-----------------+ +| 2020-03-03 | 15 | +| 2020-03-04 | 20 | +| 2020-03-05 | 25 | +| 2020-03-06 | 30 | +| 2020-03-07 | 35 | +| 2020-03-08 | 40 | //第一次来自分区,后续来自缓存 +| 2020-03-09 | 25 | //来自分区 ++------------+-----------------+ +7 rows in set (0.02 sec) +``` + +在PartitionCache中,缓存第一级Key是去掉了分区条件后的SQL的128位MD5签名,下面是改写后的待签名的SQL: +``` +SELECT eventdate,count(userid) FROM testdb.appevent GROUP BY eventdate ORDER BY eventdate; +``` +缓存的第二级Key是查询结果集的分区字段的内容,比如上面查询结果的eventdate列的内容,二级Key的附属信息是分区的版本号和版本更新时间。 + +下面演示上面SQL在2020-03-09当天第一次执行的流程: +1. 从缓存中获取数据 +``` ++------------+-----------------+ +| 2020-03-03 | 15 | +| 2020-03-04 | 20 | +| 2020-03-05 | 25 | +| 2020-03-06 | 30 | +| 2020-03-07 | 35 | ++------------+-----------------+ +``` +2. 从BE中获取数据的SQL和数据 +``` +SELECT eventdate,count(userid) FROM testdb.appevent WHERE eventdate>="2020-03-08" AND eventdate<="2020-03-09" GROUP BY eventdate ORDER BY eventdate; + ++------------+-----------------+ +| 2020-03-08 | 40 | ++------------+-----------------+ +| 2020-03-09 | 25 | ++------------+-----------------+ +``` +3. 最后发送给终端的数据 +``` ++------------+-----------------+ +| eventdate | count(`userid`) | ++------------+-----------------+ +| 2020-03-03 | 15 | +| 2020-03-04 | 20 | +| 2020-03-05 | 25 | +| 2020-03-06 | 30 | +| 2020-03-07 | 35 | +| 2020-03-08 | 40 | +| 2020-03-09 | 25 | ++------------+-----------------+ +``` +4. 发送给缓存的数据 +``` ++------------+-----------------+ +| 2020-03-08 | 40 | ++------------+-----------------+ +``` + +Partition缓存,适合按日期分区,部分分区实时更新,查询SQL较为固定。 + +分区字段也可以是其他字段,但是需要保证只有少量分区更新。 + +### 一些限制 +* 只支持OlapTable,其他存储如MySQL的表没有版本信息,无法感知数据是否更新 +* 只支持按分区字段分组,不支持按其他字段分组,按其他字段分组,该分组数据都有可能被更新,会导致缓存都失效 +* 只支持结果集的前半部分、后半部分以及全部命中缓存,不支持结果集被缓存数据分割成几个部分 + +## 使用方式 +### 开启SQLCache +fe.conf添加enable_sql_cache=true +``` +vim fe/conf/fe.conf +enable_sql_cache=true +``` +在MySQL命令行中设置变量 +``` +MySQL [(none)]> set [global] enable_sql_cache=true; +``` +注:globa是全局变量,不加指当前会话变量 + +### 开启PartitionCache +fe.conf添加enable_partition_cache=true +``` +vim fe/conf/fe.conf +enable_partition_cache=true +``` +在MySQL命令行中设置变量 +``` +MySQL [(none)]> set [global] enable_partition_cache=true; +``` + +如果同时开启了两个缓存策略,下面的参数,需要注意一下: +``` +last_version_interval_second=3600 +``` +如果分区的最新版本的时间离现在的间隔,大于last_version_interval_second,则会优先把整个查询结果缓存。如果小于这个间隔,如果符合PartitionCache的条件,则按PartitionCache数据。 + +### 监控 +FE的监控项: +``` +query_table //Query中有表的数量 +query_olap_table //Query中有Olap表的数量 +cache_mode_sql //识别缓存模式为sql的Query数量 +cache_hit_sql //模式为sql的Query命中Cache的数量 +query_mode_partition //识别缓存模式为Partition的Query数量 +cache_hit_partition //通过Partition命中的Query数量 +partition_all //Query中扫描的所有分区 +partition_hit //通过Cache命中的分区数量 + +Cache命中率 = (cache_hit_sql + cache_hit_partition) / query_olap_table +Partition命中率 = partition_hit / partition_all +``` + +BE的监控项: +``` +cache_memory_total //Cache内存大小 +cache_sql_total //Cache的SQL的数量 +cache_partition_total //Cache分区数量 + +SQL平均数据大小 = cache_memory_total / cache_sql_total +Partition平均数据大小 = cache_memory_total / cache_partition_total +``` + +其他监控: +可以从Grafana中查看BE节点的CPU和内存指标,Query统计中的Query Percentile等指标,配合Cache参数的调整来达成业务目标。 + + +### 优化参数 +FE的配置项cache_result_max_row_count,查询结果集放入缓存的最大行数,可以根据实际情况调整,但建议不要设置过大,避免过多占用内存,超过这个大小的结果集不会被缓存。 +``` +vim fe/conf/fe.conf +cache_result_max_row_count=1000 +``` + +BE最大分区数量cache_max_partition_count,指每个SQL对应的最大分区数,如果是按日期分区,能缓存2年多的数据,假如想保留更长时间的缓存,请把这个参数设置得更大,同时修改cache_result_max_row_count的参数。 +``` +vim be/conf/be.conf +cache_max_partition_count=1024 +``` + +BE中缓存内存设置,有两个参数cache_max_size和cache_elasticity_size两部分组成(单位MB),内存超过cache_max_size+cache_elasticity_size会开始清理,并把内存控制到cache_max_size以下。可以根据BE节点数量,节点内存大小,和缓存命中率来设置这两个参数。 +``` +cache_max_size=256 +cache_elasticity_size=128 +``` +计算方法: + +假如缓存10K个Query,每个Query缓存1000行,每行是128个字节,分布在10台BE上,则每个BE需要128M内存(10K*1000*128/10)。 + +## 未尽事项 +* T+1的数据,是否也可以用Partition缓存? 目前不支持 +* 类似的SQL,之前查询了2个指标,现在查询3个指标,是否可以利用2个指标的缓存? 目前不支持 +* 按日期分区,但是需要按周维度汇总数据,是否可用PartitionCache? 目前不支持 + + + + + + + + diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 14fd6ef483e97e..d7c3abba1d14b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -207,6 +207,19 @@ public void setVisibleVersion(long visibleVersion, long visibleVersionTime, long this.visibleVersionHash = visibleVersionHash; } + private void SetVisibleVersion(long visibleVersion, long visibleVersionHash){ + this.visibleVersion = visibleVersion; + this.visibleVersionTime = System.currentTimeMillis(); + this.visibleVersionHash = visibleVersionHash; + } + + //Just for unit test + public void SetVisibleVersion(long visibleVersion, long visibleVersionHash, long visibleVersionTime){ + this.visibleVersion = visibleVersion; + this.visibleVersionTime = visibleVersionTime; + this.visibleVersionHash = visibleVersionHash; + } + public PartitionState getState() { return this.state; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 26fccc0c42541f..a9aab926b8c356 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1171,7 +1171,7 @@ public class Config extends ConfigBase { public static long min_clone_task_timeout_sec = 3 * 60; // 3min @ConfField(mutable = true, masterOnly = true) public static long max_clone_task_timeout_sec = 2 * 60 * 60; // 2h - + /** * If set to true, fe will enable sql result cache * This option is suitable for offline data update scenarios diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java index a75563d4c15ff8..cb5a0df0ea6a56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java @@ -47,6 +47,12 @@ public void addChildren(List n) { public ArrayList getChildren() { return children; } public void clearChildren() { children.clear(); } + public void removeNode(int i){ + if (children != null && i>=0 && i< children.size()) { + children.remove(i); + } + } + /** * Count the total number of nodes in this tree. Leaf node will return 1. * Non-leaf node will include all its children. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index 1705bb632ccd88..d1d621d45de73e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -59,6 +59,7 @@ public class ProfileManager { public static final String SQL_STATEMENT = "Sql Statement"; public static final String USER = "User"; public static final String DEFAULT_DB = "Default Db"; + public static final String IS_CACHED = "IsCached"; public static final ArrayList PROFILE_HEADERS = new ArrayList( Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 09e2e3cee7c20c..1514f381205c36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -65,6 +65,11 @@ import org.apache.doris.planner.Planner; import org.apache.doris.proto.PQueryStatistics; import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.cache.Cache; +import org.apache.doris.qe.cache.CacheAnalyzer; +import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; +import org.apache.doris.qe.cache.CacheBeProxy; +import org.apache.doris.qe.cache.CacheProxy; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.RpcException; @@ -114,6 +119,7 @@ public class StmtExecutor { private boolean isProxy; private ShowResultSet proxyResultSet = null; private PQueryStatistics statisticsForAuditLog; + private boolean isCached; // this constructor is mainly for proxy public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) { @@ -155,6 +161,8 @@ public void initProfile(long beginTimeInNanoSecond) { summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser()); summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase()); summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt.originStmt); + summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ? "Yes" : "No"); + profile.addChild(summaryProfile); if (coord != null) { coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond)); @@ -575,6 +583,78 @@ private void handleSetStmt() { context.getState().setOk(); } + private void sendChannel(MysqlChannel channel, List cacheValues, boolean hitAll) + throws Exception { + RowBatch batch = null; + for (CacheBeProxy.CacheValue value : cacheValues) { + batch = value.getRowBatch(); + for (ByteBuffer row : batch.getBatch().getRows()) { + channel.sendOnePacket(row); + } + context.updateReturnRows(batch.getBatch().getRows().size()); + } + if (hitAll) { + if (batch != null) { + statisticsForAuditLog = batch.getQueryStatistics(); + } + context.getState().setEof(); + return; + } + } + + private boolean handleCacheStmt(CacheAnalyzer cacheAnalyzer,MysqlChannel channel) throws Exception { + RowBatch batch = null; + CacheBeProxy.FetchCacheResult cacheResult = cacheAnalyzer.getCacheData(); + CacheMode mode = cacheAnalyzer.getCacheMode(); + if (cacheResult != null) { + isCached = true; + if (cacheAnalyzer.getHitRange() == Cache.HitRange.Full) { + sendChannel(channel, cacheResult.getValueList(), true); + return true; + } + //rewrite sql + if (mode == CacheMode.Partition) { + if (cacheAnalyzer.getHitRange() == Cache.HitRange.Left) { + sendChannel(channel, cacheResult.getValueList(), false); + } + SelectStmt newSelectStmt = cacheAnalyzer.getRewriteStmt(); + newSelectStmt.reset(); + analyzer = new Analyzer(context.getCatalog(), context); + newSelectStmt.analyze(analyzer); + planner = new Planner(); + planner.plan(newSelectStmt, analyzer, context.getSessionVariable().toThrift()); + } + } + + coord = new Coordinator(context, analyzer, planner); + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), + new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + coord.exec(); + + while (true) { + batch = coord.getNext(); + if (batch.getBatch() != null) { + cacheAnalyzer.copyRowBatch(batch); + for (ByteBuffer row : batch.getBatch().getRows()) { + channel.sendOnePacket(row); + } + context.updateReturnRows(batch.getBatch().getRows().size()); + } + if (batch.isEos()) { + break; + } + } + + if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) { + sendChannel(channel, cacheResult.getValueList(), false); + } + + cacheAnalyzer.updateCache(); + statisticsForAuditLog = batch.getQueryStatistics(); + context.getState().setEof(); + return false; + } + // Process a select statement. private void handleQueryStmt() throws Exception { // Every time set no send flag and clean all data in buffer @@ -595,12 +675,6 @@ private void handleQueryStmt() throws Exception { handleExplainStmt(explainString); return; } - coord = new Coordinator(context, analyzer, planner); - - QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), - new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); - - coord.exec(); // if python's MysqlDb get error after sendfields, it can't catch the exception // so We need to send fields after first batch arrived @@ -619,20 +693,31 @@ private void handleQueryStmt() throws Exception { if (!isOutfileQuery) { sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); } + + //Sql and PartitionCache + CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner); + if (cacheAnalyzer.enableCache()) { + handleCacheStmt(cacheAnalyzer, channel); + return; + } + + coord = new Coordinator(context, analyzer, planner); + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), + new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + coord.exec(); while (true) { batch = coord.getNext(); // for outfile query, there will be only one empty batch send back with eos flag if (batch.getBatch() != null && !isOutfileQuery) { for (ByteBuffer row : batch.getBatch().getRows()) { channel.sendOnePacket(row); - } - context.updateReturnRows(batch.getBatch().getRows().size()); + } + context.updateReturnRows(batch.getBatch().getRows().size()); } if (batch.isEos()) { break; } } - statisticsForAuditLog = batch.getQueryStatistics(); if (!isOutfileQuery) { context.getState().setEof(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java index 9504513edf0dec..353adfcbf200a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java @@ -18,7 +18,7 @@ package org.apache.doris.qe.cache; import org.apache.doris.analysis.SelectStmt; -//import org.apache.doris.common.Config; +import org.apache.doris.common.Config; import org.apache.doris.common.Status; import org.apache.doris.qe.RowBatch; import org.apache.doris.thrift.TUniqueId; @@ -38,8 +38,8 @@ public enum HitRange { protected TUniqueId queryId; protected SelectStmt selectStmt; - //protected RowBatchBuilder rowBatchBuilder; - //protected CacheAnalyzer.CacheTable latestTable; + protected RowBatchBuilder rowBatchBuilder; + protected CacheAnalyzer.CacheTable latestTable; protected CacheProxy proxy; protected HitRange hitRange; @@ -72,7 +72,6 @@ public HitRange getHitRange() { public abstract void updateCache(); protected boolean checkRowLimit() { - /* if (rowBatchBuilder == null) { return false; } @@ -82,7 +81,6 @@ protected boolean checkRowLimit() { return false; } else { return true; - }*/ - return false; + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java new file mode 100644 index 00000000000000..765dddece7db87 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -0,0 +1,449 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe.cache; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InlineViewRef; +import org.apache.doris.analysis.QueryStmt; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.TableRef; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.Planner; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.RowBatch; +import org.apache.doris.common.Config; +import org.apache.doris.common.Status; + +import com.google.common.collect.Lists; +import org.apache.doris.thrift.TUniqueId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Analyze which caching mode a SQL is suitable for + * 1. T + 1 update is suitable for SQL mode + * 2. Partition by date, update the data of the day in near real time, which is suitable for Partition mode + */ +public class CacheAnalyzer { + private static final Logger LOG = LogManager.getLogger(CacheAnalyzer.class); + + /** + * NoNeed : disable config or variable, not query, not scan table etc. + */ + public enum CacheMode { + NoNeed, + None, + TTL, + Sql, + Partition + } + + private ConnectContext context; + private boolean enableSqlCache = false; + private boolean enablePartitionCache = false; + private TUniqueId queryId; + private CacheMode cacheMode; + private CacheTable latestTable; + private StatementBase parsedStmt; + private SelectStmt selectStmt; + private List scanNodes; + private OlapTable olapTable; + private RangePartitionInfo partitionInfo; + private Column partColumn; + private CompoundPredicate partitionPredicate; + private Cache cache; + + public Cache getCache() { + return cache; + } + + public CacheAnalyzer(ConnectContext context, StatementBase parsedStmt, Planner planner) { + this.context = context; + this.queryId = context.queryId(); + this.parsedStmt = parsedStmt; + scanNodes = planner.getScanNodes(); + latestTable = new CacheTable(); + checkCacheConfig(); + } + + //for unit test + public CacheAnalyzer(ConnectContext context, StatementBase parsedStmt, List scanNodes) { + this.context = context; + this.parsedStmt = parsedStmt; + this.scanNodes = scanNodes; + checkCacheConfig(); + } + + private void checkCacheConfig() { + if (Config.cache_enable_sql_mode) { + if (context.getSessionVariable().isEnableSqlCache()) { + enableSqlCache = true; + } + } + if (Config.cache_enable_partition_mode) { + if (context.getSessionVariable().isEnablePartitionCache()) { + enablePartitionCache = true; + } + } + } + + public CacheMode getCacheMode() { + return cacheMode; + } + + public class CacheTable implements Comparable { + public OlapTable olapTable; + public long latestId; + public long latestVersion; + public long latestTime; + + public CacheTable() { + olapTable = null; + latestId = 0; + latestVersion = 0; + latestTime = 0; + } + + @Override + public int compareTo(CacheTable table) { + return (int) (table.latestTime - this.latestTime); + } + + public void Debug() { + LOG.info("table {}, partition id {}, ver {}, time {}", olapTable.getName(), latestId, latestVersion, latestTime); + } + } + + public boolean enableCache() { + return enableSqlCache || enablePartitionCache; + } + + public boolean enableSqlCache() { + return enableSqlCache; + } + + public boolean enablePartitionCache() { + return enablePartitionCache; + } + + /** + * Check cache mode with SQL and table + * 1、Only Olap table + * 2、The update time of the table is before Config.last_version_interval_time + * 2、PartitionType is PartitionType.RANGE, and partition key has only one column + * 4、Partition key must be included in the group by clause + * 5、Where clause must contain only one partition key predicate + * CacheMode.Sql + * xxx FROM user_profile, updated before Config.last_version_interval_time + * CacheMode.Partition, partition by event_date, only the partition of today will be updated. + * SELECT xxx FROM app_event WHERE event_date >= 20191201 AND event_date <= 20191207 GROUP BY event_date + * SELECT xxx FROM app_event INNER JOIN user_Profile ON app_event.user_id = user_profile.user_id xxx + * SELECT xxx FROM app_event INNER JOIN user_profile ON xxx INNER JOIN site_channel ON xxx + */ + public void checkCacheMode(long now) { + cacheMode = innerCheckCacheMode(now); + } + + private CacheMode innerCheckCacheMode(long now) { + if (!enableCache()) { + return CacheMode.NoNeed; + } + if (!(parsedStmt instanceof SelectStmt) || scanNodes.size() == 0) { + return CacheMode.NoNeed; + } + MetricRepo.COUNTER_QUERY_TABLE.increase(1L); + + this.selectStmt = (SelectStmt) parsedStmt; + //Check the last version time of the table + List tblTimeList = Lists.newArrayList(); + for (int i = 0; i < scanNodes.size(); i++) { + ScanNode node = scanNodes.get(i); + if (!(node instanceof OlapScanNode)) { + return CacheMode.None; + } + OlapScanNode oNode = (OlapScanNode) node; + OlapTable oTable = oNode.getOlapTable(); + CacheTable cTable = getLastUpdateTime(oTable); + tblTimeList.add(cTable); + } + MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L); + Collections.sort(tblTimeList); + latestTable = tblTimeList.get(0); + latestTable.Debug(); + + if (now == 0) { + now = nowtime(); + } + if (enableSqlCache() && + (now - latestTable.latestTime) >= Config.cache_last_version_interval_second * 1000) { + cache = new SqlCache(this.queryId, this.selectStmt); + ((SqlCache) cache).setCacheInfo(this.latestTable); + MetricRepo.COUNTER_CACHE_MODE_SQL.increase(1L); + return CacheMode.Sql; + } + + if (!enablePartitionCache()) { + return CacheMode.None; + } + + //Check if selectStmt matches partition key + //Only one table can be updated in Config.cache_last_version_interval_second range + for (int i = 1; i < tblTimeList.size(); i++) { + if ((now - tblTimeList.get(i).latestTime) < Config.cache_last_version_interval_second * 1000) { + LOG.info("the time of other tables is newer than {}", Config.cache_last_version_interval_second); + return CacheMode.None; + } + } + olapTable = latestTable.olapTable; + if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE) { + LOG.info("the partition of OlapTable not RANGE type"); + return CacheMode.None; + } + partitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo(); + List columns = partitionInfo.getPartitionColumns(); + //Partition key has only one column + if (columns.size() != 1) { + LOG.info("the size of columns for partition key is {}", columns.size()); + return CacheMode.None; + } + partColumn = columns.get(0); + //Check if group expr contain partition column + if (!checkGroupByPartitionKey(this.selectStmt, partColumn)) { + LOG.info("not group by partition key, key {}", partColumn.getName()); + return CacheMode.None; + } + //Check if whereClause have one CompoundPredicate of partition column + List compoundPredicates = Lists.newArrayList(); + getPartitionKeyFromSelectStmt(this.selectStmt, partColumn, compoundPredicates); + if (compoundPredicates.size() != 1) { + LOG.info("the predicate size include partition key has {}", compoundPredicates.size()); + return CacheMode.None; + } + partitionPredicate = compoundPredicates.get(0); + cache = new PartitionCache(this.queryId, this.selectStmt); + ((PartitionCache) cache).setCacheInfo(this.latestTable, this.partitionInfo, this.partColumn, + this.partitionPredicate); + MetricRepo.COUNTER_CACHE_MODE_PARTITION.increase(1L); + return CacheMode.Partition; + } + + public CacheBeProxy.FetchCacheResult getCacheData() { + CacheProxy.FetchCacheResult cacheResult = null; + cacheMode = innerCheckCacheMode(0); + if (cacheMode == CacheMode.NoNeed) { + return cacheResult; + } + if (cacheMode == CacheMode.None) { + LOG.info("check cache mode {}, queryid {}", cacheMode, DebugUtil.printId(queryId)); + return cacheResult; + } + Status status = new Status(); + cacheResult = cache.getCacheData(status); + + if (status.ok() && cacheResult != null) { + LOG.info("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}", + cacheMode, DebugUtil.printId(queryId), + cacheResult.all_count, cacheResult.value_count, + cacheResult.row_count, cacheResult.data_size); + } else { + LOG.info("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode, + DebugUtil.printId(queryId), status.getErrorCode(), status.getErrorMsg()); + cacheResult = null; + } + return cacheResult; + } + + public long nowtime() { + return System.currentTimeMillis(); + } + + private void getPartitionKeyFromSelectStmt(SelectStmt stmt, Column partColumn, + List compoundPredicates) { + getPartitionKeyFromWhereClause(stmt.getWhereClause(), partColumn, compoundPredicates); + List tableRefs = stmt.getTableRefs(); + for (TableRef tblRef : tableRefs) { + if (tblRef instanceof InlineViewRef) { + InlineViewRef viewRef = (InlineViewRef) tblRef; + QueryStmt queryStmt = viewRef.getViewStmt(); + if (queryStmt instanceof SelectStmt) { + getPartitionKeyFromSelectStmt((SelectStmt) queryStmt, partColumn, compoundPredicates); + } + } + } + } + + /** + * Only support case 1 + * 1.key >= a and key <= b + * 2.key = a or key = b + * 3.key in(a,b,c) + */ + private void getPartitionKeyFromWhereClause(Expr expr, Column partColumn, + List compoundPredicates) { + if (expr == null) { + return; + } + if (expr instanceof CompoundPredicate) { + CompoundPredicate cp = (CompoundPredicate) expr; + if (cp.getOp() == CompoundPredicate.Operator.AND) { + if (cp.getChildren().size() == 2 && cp.getChild(0) instanceof BinaryPredicate && + cp.getChild(1) instanceof BinaryPredicate) { + BinaryPredicate leftPre = (BinaryPredicate) cp.getChild(0); + BinaryPredicate rightPre = (BinaryPredicate) cp.getChild(1); + String leftColumn = getColumnName(leftPre); + String rightColumn = getColumnName(rightPre); + if (leftColumn.equalsIgnoreCase(partColumn.getName()) && + rightColumn.equalsIgnoreCase(partColumn.getName())) { + compoundPredicates.add(cp); + } + } + } + for (Expr subExpr : expr.getChildren()) { + getPartitionKeyFromWhereClause(subExpr, partColumn, compoundPredicates); + } + } + } + + private String getColumnName(BinaryPredicate predicate) { + SlotRef slot = null; + if (predicate.getChild(0) instanceof SlotRef) { + slot = (SlotRef) predicate.getChild(0); + } else if (predicate.getChild(0) instanceof CastExpr) { + CastExpr expr = (CastExpr) predicate.getChild(0); + if (expr.getChild(0) instanceof SlotRef) { + slot = (SlotRef) expr.getChild(0); + } + } + + if (slot != null) { + return slot.getColumnName(); + } + return ""; + } + + /** + * Check the selectStmt and tableRefs always group by partition key + * 1. At least one group by + * 2. group by must contain partition key + * 3. AggregateInfo cannot be distinct agg + */ + private boolean checkGroupByPartitionKey(SelectStmt stmt, Column partColumn) { + List aggInfoList = Lists.newArrayList(); + getAggInfoList(stmt, aggInfoList); + int groupbyCount = 0; + for (AggregateInfo aggInfo : aggInfoList) { + if (aggInfo.isDistinctAgg()) { + return false; + } + ArrayList groupExprs = aggInfo.getGroupingExprs(); + if (groupExprs == null) { + continue; + } + groupbyCount += 1; + boolean matched = false; + for (Expr groupExpr : groupExprs) { + SlotRef slot = (SlotRef) groupExpr; + if (partColumn.getName().equals(slot.getColumnName())) { + matched = true; + break; + } + } + if (!matched) { + return false; + } + } + return groupbyCount > 0 ? true : false; + } + + private void getAggInfoList(SelectStmt stmt, List aggInfoList) { + AggregateInfo aggInfo = stmt.getAggInfo(); + if (aggInfo != null) { + aggInfoList.add(aggInfo); + } + List tableRefs = stmt.getTableRefs(); + for (TableRef tblRef : tableRefs) { + if (tblRef instanceof InlineViewRef) { + InlineViewRef viewRef = (InlineViewRef) tblRef; + QueryStmt queryStmt = viewRef.getViewStmt(); + if (queryStmt instanceof SelectStmt) { + getAggInfoList((SelectStmt) queryStmt, aggInfoList); + } + } + } + } + + private CacheTable getLastUpdateTime(OlapTable olapTable) { + CacheTable table = new CacheTable(); + table.olapTable = olapTable; + for (Partition partition : olapTable.getPartitions()) { + if (partition.getVisibleVersionTime() >= table.latestTime && + partition.getVisibleVersion() > table.latestVersion) { + table.latestId = partition.getId(); + table.latestTime = partition.getVisibleVersionTime(); + table.latestVersion = partition.getVisibleVersion(); + } + } + return table; + } + + public Cache.HitRange getHitRange() { + if (cacheMode == CacheMode.None) { + return Cache.HitRange.None; + } + return cache.getHitRange(); + } + + public SelectStmt getRewriteStmt() { + if (cacheMode != CacheMode.Partition) { + return null; + } + return cache.getRewriteStmt(); + } + + public void copyRowBatch(RowBatch rowBatch) { + if (cacheMode == CacheMode.None || cacheMode == CacheMode.NoNeed) { + return; + } + cache.copyRowBatch(rowBatch); + } + + public void updateCache() { + if (cacheMode == CacheMode.None || cacheMode == CacheMode.NoNeed) { + return; + } + cache.updateCache(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java index cd989709484f63..cfb03856a876bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java @@ -59,10 +59,10 @@ public void updateCache(UpdateCacheRequest request, int timeoutMs, Status status try { PUpdateCacheRequest updateRequest = request.getRpcRequest(); Future future = BackendServiceProxy.getInstance().updateCache(address, updateRequest); - PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS); - if (response.status == PCacheStatus.CACHE_OK) { + PCacheResponse response = future.get(10000,TimeUnit.MICROSECONDS); + if( response.status == PCacheStatus.CACHE_OK) { status.setStatus(new Status(TStatusCode.OK, "CACHE_OK")); - } else { + }else { status.setStatus(response.status.toString()); } } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java index 4dedb16396c500..b2fd6554ca3691 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java @@ -174,7 +174,6 @@ public void debug() { } } - public static class FetchCacheRequest extends PFetchCacheRequest { private String sqlStr; private List paramList; @@ -273,7 +272,6 @@ public static CacheProxy getCacheProxy(CacheProxyType type) { public abstract void clearCache(PClearCacheRequest clearRequest); - public static PUniqueId getMd5(String str) { MessageDigest msgDigest; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java new file mode 100644 index 00000000000000..6801a50d1370f9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe.cache; + +import com.google.common.collect.Lists; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InlineViewRef; +import org.apache.doris.analysis.QueryStmt; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.TableRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.common.Status; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.qe.RowBatch; +import org.apache.doris.thrift.TUniqueId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +public class PartitionCache extends Cache { + private static final Logger LOG = LogManager.getLogger(PartitionCache.class); + private SelectStmt nokeyStmt; + private SelectStmt rewriteStmt; + private CompoundPredicate partitionPredicate; + private OlapTable olapTable; + private RangePartitionInfo partitionInfo; + private Column partColumn; + + private PartitionRange range; + private List newRangeList; + + public SelectStmt getRewriteStmt() { + return rewriteStmt; + } + + public SelectStmt getNokeyStmt() { + return nokeyStmt; + } + + public PartitionCache(TUniqueId queryId, SelectStmt selectStmt) { + super(queryId, selectStmt); + } + + public void setCacheInfo(CacheAnalyzer.CacheTable latestTable, RangePartitionInfo partitionInfo, Column partColumn, + CompoundPredicate partitionPredicate) { + this.latestTable = latestTable; + this.olapTable = latestTable.olapTable; + this.partitionInfo = partitionInfo; + this.partColumn = partColumn; + this.partitionPredicate = partitionPredicate; + this.newRangeList = Lists.newArrayList(); + } + + public CacheProxy.FetchCacheResult getCacheData(Status status) { + CacheProxy.FetchCacheRequest request; + rewriteSelectStmt(null); + request = new CacheBeProxy.FetchCacheRequest(nokeyStmt.toSql()); + range = new PartitionRange(this.partitionPredicate, this.olapTable, + this.partitionInfo); + if (!range.analytics()) { + status.setStatus("analytics range error"); + return null; + } + + for (PartitionRange.PartitionSingle single : range.getPartitionSingleList()) { + request.addParam(single.getCacheKey().realValue(), + single.getPartition().getVisibleVersion(), + single.getPartition().getVisibleVersionTime() + ); + } + + CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status); + if (status.ok() && cacheResult != null) { + cacheResult.all_count = range.getPartitionSingleList().size(); + for (CacheBeProxy.CacheValue value : cacheResult.getValueList()) { + range.setCacheFlag(value.param.partition_key); + } + MetricRepo.COUNTER_CACHE_HIT_PARTITION.increase(1L); + MetricRepo.COUNTER_CACHE_PARTITION_ALL.increase((long) range.getPartitionSingleList().size()); + MetricRepo.COUNTER_CACHE_PARTITION_HIT.increase((long) cacheResult.getValueList().size()); + } + + range.setTooNewByID(latestTable.latestId); + //build rewrite sql + this.hitRange = range.diskPartitionRange(newRangeList); + if (newRangeList != null && newRangeList.size() > 0) { + rewriteSelectStmt(newRangeList); + } + return cacheResult; + } + + public void copyRowBatch(RowBatch rowBatch) { + if (rowBatchBuilder == null) { + rowBatchBuilder = new RowBatchBuilder(CacheAnalyzer.CacheMode.Partition); + rowBatchBuilder.partitionIndex(selectStmt.getResultExprs(), selectStmt.getColLabels(), + partColumn, range.updatePartitionRange()); + } + rowBatchBuilder.copyRowData(rowBatch); + } + + public void updateCache() { + if (!super.checkRowLimit()) { + return; + } + + CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildPartitionUpdateRequest(nokeyStmt.toSql()); + if (updateRequest.value_count > 0) { + CacheBeProxy proxy = new CacheBeProxy(); + Status status = new Status(); + proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status); + LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}", + CacheAnalyzer.CacheMode.Partition, DebugUtil.printId(queryId), + DebugUtil.printId(updateRequest.sql_key), + updateRequest.value_count, updateRequest.row_count, updateRequest.data_size); + } + } + + /** + * Set the predicate containing partition key to null + */ + public void rewriteSelectStmt(List newRangeList) { + if (newRangeList == null || newRangeList.size() == 0) { + this.nokeyStmt = (SelectStmt) this.selectStmt.clone(); + rewriteSelectStmt(nokeyStmt, this.partitionPredicate, null); + } else { + this.rewriteStmt = (SelectStmt) this.selectStmt.clone(); + rewriteSelectStmt(rewriteStmt, this.partitionPredicate, newRangeList); + } + } + + private void rewriteSelectStmt(SelectStmt newStmt, CompoundPredicate predicate, + List newRangeList) { + newStmt.setWhereClause( + rewriteWhereClause(newStmt.getWhereClause(), predicate, newRangeList) + ); + List tableRefs = newStmt.getTableRefs(); + for (TableRef tblRef : tableRefs) { + if (tblRef instanceof InlineViewRef) { + InlineViewRef viewRef = (InlineViewRef) tblRef; + QueryStmt queryStmt = viewRef.getViewStmt(); + if (queryStmt instanceof SelectStmt) { + rewriteSelectStmt((SelectStmt) queryStmt, predicate, newRangeList); + } + } + } + } + + /** + * P1 And P2 And P3 And P4 + */ + private Expr rewriteWhereClause(Expr expr, CompoundPredicate predicate, + List newRangeList) { + if (expr == null) { + return null; + } + if (!(expr instanceof CompoundPredicate)) { + return expr; + } + if (expr.equals(predicate)) { + if (newRangeList == null) { + return null; + } else { + getPartitionRange().rewritePredicate((CompoundPredicate) expr, newRangeList); + return expr; + } + } + + for (int i = 0; i < expr.getChildren().size(); i++) { + Expr child = rewriteWhereClause(expr.getChild(i), predicate, newRangeList); + if (child == null) { + expr.removeNode(i); + i--; + } else { + expr.setChild(i, child); + } + } + if (expr.getChildren().size() == 0) { + return null; + } else if (expr.getChildren().size() == 1) { + return expr.getChild(0); + } else { + return expr; + } + } + + public PartitionRange getPartitionRange() { + if (range == null) { + range = new PartitionRange(this.partitionPredicate, + this.olapTable, this.partitionInfo); + return range; + } else { + return range; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java new file mode 100644 index 00000000000000..a003250fa6990d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java @@ -0,0 +1,596 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe.cache; + +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.DateLiteral; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.Config; +import org.apache.doris.planner.PartitionColumnFilter; + +import org.apache.doris.common.AnalysisException; + +import com.google.common.collect.Lists; +import com.google.common.collect.Range; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * Convert the range of the partition to the list + * all partition by day/week/month split to day list + */ +public class PartitionRange { + private static final Logger LOG = LogManager.getLogger(PartitionRange.class); + + public class PartitionSingle { + private Partition partition; + private PartitionKey partitionKey; + private long partitionId; + private PartitionKeyType cacheKey; + private boolean fromCache; + private boolean tooNew; + + public Partition getPartition() { + return partition; + } + + public void setPartition(Partition partition) { + this.partition = partition; + } + + public PartitionKey getPartitionKey() { + return partitionKey; + } + + public void setPartitionKey(PartitionKey key) { + this.partitionKey = key; + } + + public long getPartitionId() { + return partitionId; + } + + public void setPartitionId(long partitionId) { + this.partitionId = partitionId; + } + + public PartitionKeyType getCacheKey() { + return cacheKey; + } + + public void setCacheKey(PartitionKeyType cacheKey) { + this.cacheKey.clone(cacheKey); + } + + public boolean isFromCache() { + return fromCache; + } + + public void setFromCache(boolean fromCache) { + this.fromCache = fromCache; + } + + public boolean isTooNew() { + return tooNew; + } + + public void setTooNew(boolean tooNew) { + this.tooNew = tooNew; + } + + public PartitionSingle() { + this.partitionId = 0; + this.cacheKey = new PartitionKeyType(); + this.fromCache = false; + this.tooNew = false; + } + + public void Debug() { + if (partition != null) { + LOG.info("partition id {}, cacheKey {}, version {}, time {}, fromCache {}, tooNew {} ", + partitionId, cacheKey.realValue(), + partition.getVisibleVersion(), partition.getVisibleVersionTime(), + fromCache, tooNew); + } else { + LOG.info("partition id {}, cacheKey {}, fromCache {}, tooNew {} ", partitionId, + cacheKey.realValue(), fromCache, tooNew); + } + } + } + + public enum KeyType { + DEFAULT, + LONG, + DATE, + DATETIME, + TIME + } + + public static class PartitionKeyType { + private SimpleDateFormat df8 = new SimpleDateFormat("yyyyMMdd"); + private SimpleDateFormat df10 = new SimpleDateFormat("yyyy-MM-dd"); + + public KeyType keyType = KeyType.DEFAULT; + public long value; + public Date date; + + public boolean init(Type type, String str) { + if (type.getPrimitiveType() == PrimitiveType.DATE) { + try { + date = df10.parse(str); + } catch (Exception e) { + LOG.warn("parse error str{}.", str); + return false; + } + keyType = KeyType.DATE; + } else { + value = Long.valueOf(str); + keyType = KeyType.LONG; + } + return true; + } + + public boolean init(Type type, LiteralExpr expr) { + switch (type.getPrimitiveType()) { + case BOOLEAN: + case TIME: + case DATETIME: + case FLOAT: + case DOUBLE: + case DECIMAL: + case DECIMALV2: + case CHAR: + case VARCHAR: + case LARGEINT: + LOG.info("PartitionCache not support such key type {}", type.toSql()); + return false; + case DATE: + date = getDateValue(expr); + keyType = KeyType.DATE; + break; + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + value = expr.getLongValue(); + keyType = KeyType.LONG; + break; + } + return true; + } + + public void clone(PartitionKeyType key) { + keyType = key.keyType; + value = key.value; + date = key.date; + } + + public boolean equals(PartitionKeyType key) { + return realValue() == key.realValue(); + } + + public void add(int num) { + if (keyType == KeyType.DATE) { + date = new Date(date.getTime() + num * 3600 * 24 * 1000); + } else { + value += num; + } + } + + public String toString() { + if (keyType == KeyType.DEFAULT) { + return ""; + } else if (keyType == KeyType.DATE) { + return df10.format(date); + } else { + return String.valueOf(value); + } + } + + public long realValue() { + if (keyType == KeyType.DATE) { + return Long.parseLong(df8.format(date)); + } else { + return value; + } + } + + private Date getDateValue(LiteralExpr expr) { + value = expr.getLongValue() / 1000000; + Date dt = null; + try { + dt = df8.parse(String.valueOf(value)); + } catch (Exception e) { + } + return dt; + } + } + + private CompoundPredicate partitionKeyPredicate; + private OlapTable olapTable; + private RangePartitionInfo rangePartitionInfo; + private Column partitionColumn; + private List partitionSingleList; + + public CompoundPredicate getPartitionKeyPredicate() { + return partitionKeyPredicate; + } + + public void setPartitionKeyPredicate(CompoundPredicate partitionKeyPredicate) { + this.partitionKeyPredicate = partitionKeyPredicate; + } + + public RangePartitionInfo getRangePartitionInfo() { + return rangePartitionInfo; + } + + public void setRangePartitionInfo(RangePartitionInfo rangePartitionInfo) { + this.rangePartitionInfo = rangePartitionInfo; + } + + public Column getPartitionColumn() { + return partitionColumn; + } + + public void setPartitionColumn(Column partitionColumn) { + this.partitionColumn = partitionColumn; + } + + public List getPartitionSingleList() { + return partitionSingleList; + } + + public PartitionRange() { + } + + public PartitionRange(CompoundPredicate partitionKeyPredicate, OlapTable olapTable, + RangePartitionInfo rangePartitionInfo) { + this.partitionKeyPredicate = partitionKeyPredicate; + this.olapTable = olapTable; + this.rangePartitionInfo = rangePartitionInfo; + this.partitionSingleList = Lists.newArrayList(); + } + + /** + * analytics PartitionKey and PartitionInfo + * + * @return + */ + public boolean analytics() { + if (rangePartitionInfo.getPartitionColumns().size() != 1) { + return false; + } + partitionColumn = rangePartitionInfo.getPartitionColumns().get(0); + PartitionColumnFilter filter = createPartitionFilter(this.partitionKeyPredicate, partitionColumn); + try { + if (!buildPartitionKeyRange(filter, partitionColumn)) { + return false; + } + getTablePartitionList(olapTable); + } catch (AnalysisException e) { + LOG.warn("get partition range failed, because:", e); + return false; + } + return true; + } + + public boolean setCacheFlag(long cacheKey) { + boolean find = false; + for (PartitionSingle single : partitionSingleList) { + if (single.getCacheKey().realValue() == cacheKey) { + single.setFromCache(true); + find = true; + break; + } + } + return find; + } + + public boolean setTooNewByID(long partitionId) { + boolean find = false; + for (PartitionSingle single : partitionSingleList) { + if (single.getPartition().getId() == partitionId) { + single.setTooNew(true); + find = true; + break; + } + } + return find; + } + + public boolean setTooNewByKey(long cacheKey) { + boolean find = false; + for (PartitionSingle single : partitionSingleList) { + if (single.getCacheKey().realValue() == cacheKey) { + single.setTooNew(true); + find = true; + break; + } + } + return find; + } + + /** + * Support left or right hit cache, not support middle. + * 20200113-2020115, not support 20200114 + */ + public Cache.HitRange diskPartitionRange(List rangeList) { + Cache.HitRange hitRange = Cache.HitRange.None; + if (partitionSingleList.size() == 0) { + return hitRange; + } + int begin = partitionSingleList.size() - 1; + int end = 0; + for (int i = 0; i < partitionSingleList.size(); i++) { + if (!partitionSingleList.get(i).isFromCache()) { + if (begin > i) { + begin = i; + } + if (end < i) { + end = i; + } + } + } + if (end < begin) { + hitRange = Cache.HitRange.Full; + return hitRange; + } + + if (end == partitionSingleList.size() - 1) { + hitRange = Cache.HitRange.Left; + } + if (begin == 0) { + hitRange = Cache.HitRange.Right; + } + + rangeList.add(partitionSingleList.get(begin)); + rangeList.add(partitionSingleList.get(end)); + LOG.info("the new range for scan be is [{},{}], hit range", rangeList.get(0).getCacheKey().realValue(), + rangeList.get(1).getCacheKey().realValue(), hitRange); + return hitRange; + } + + public List updatePartitionRange() { + List newList = Lists.newArrayList(); + for (PartitionSingle single : partitionSingleList) { + if (!single.isFromCache() && !single.isTooNew()) { + newList.add(single); + } + } + return newList; + } + + public boolean rewritePredicate(CompoundPredicate predicate, List rangeList) { + if (predicate.getOp() != CompoundPredicate.Operator.AND) { + LOG.debug("predicate op {}", predicate.getOp().toString()); + return false; + } + for (Expr expr : predicate.getChildren()) { + if (expr instanceof BinaryPredicate) { + BinaryPredicate binPredicate = (BinaryPredicate) expr; + BinaryPredicate.Operator op = binPredicate.getOp(); + if (binPredicate.getChildren().size() != 2) { + LOG.info("binary predicate children size {}", binPredicate.getChildren().size()); + continue; + } + if (op == BinaryPredicate.Operator.NE) { + LOG.info("binary predicate op {}", op.toString()); + continue; + } + PartitionKeyType key = new PartitionKeyType(); + switch (op) { + case LE: //<= + key.clone(rangeList.get(1).getCacheKey()); + break; + case LT: //< + key.clone(rangeList.get(1).getCacheKey()); + key.add(1); + break; + case GE: //>= + key.clone(rangeList.get(0).getCacheKey()); + break; + case GT: //> + key.clone(rangeList.get(0).getCacheKey()); + key.add(-1); + break; + default: + break; + } + LiteralExpr newLiteral; + if (key.keyType == KeyType.DATE) { + try { + newLiteral = new DateLiteral(key.toString(), Type.DATE); + } catch (Exception e) { + LOG.warn("Date's format is error {},{}", key.toString(), e); + continue; + } + } else if (key.keyType == KeyType.LONG) { + newLiteral = new IntLiteral(key.realValue()); + } else { + LOG.warn("Partition cache not support type {}", key.keyType); + continue; + } + + if (binPredicate.getChild(1) instanceof LiteralExpr) { + binPredicate.removeNode(1); + binPredicate.addChild(newLiteral); + } else if (binPredicate.getChild(0) instanceof LiteralExpr) { + binPredicate.removeNode(0); + binPredicate.setChild(0, newLiteral); + } else { + continue; + } + } else if (expr instanceof InPredicate) { + InPredicate inPredicate = (InPredicate) expr; + if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) { + continue; + } + } + } + return true; + } + + /** + * Get partition info from SQL Predicate and OlapTable + * Pair + * PARTITION BY RANGE(`olap_date`) + * ( PARTITION p20200101 VALUES [("20200101"), ("20200102")), + * PARTITION p20200102 VALUES [("20200102"), ("20200103")) ) + */ + private void getTablePartitionList(OlapTable table) { + Map> range = rangePartitionInfo.getIdToRange(false); + for (Map.Entry> entry : range.entrySet()) { + Long partId = entry.getKey(); + for (PartitionSingle single : partitionSingleList) { + if (entry.getValue().contains(single.getPartitionKey())) { + if (single.getPartitionId() == 0) { + single.setPartitionId(partId); + } + } + } + } + + for (PartitionSingle single : partitionSingleList) { + single.setPartition(table.getPartition(single.getPartitionId())); + } + } + + /** + * Get value range of partition column from predicate + */ + private boolean buildPartitionKeyRange(PartitionColumnFilter partitionColumnFilter, + Column partitionColumn) throws AnalysisException { + if (partitionColumnFilter.lowerBound == null || partitionColumnFilter.upperBound == null) { + LOG.info("filter is null"); + return false; + } + PartitionKeyType begin = new PartitionKeyType(); + PartitionKeyType end = new PartitionKeyType(); + begin.init(partitionColumn.getType(), partitionColumnFilter.lowerBound); + end.init(partitionColumn.getType(), partitionColumnFilter.upperBound); + + if (!partitionColumnFilter.lowerBoundInclusive) { + begin.add(1); + } + if (!partitionColumnFilter.upperBoundInclusive) { + end.add(-1); + } + if (begin.realValue() > end.realValue()) { + LOG.info("partition range begin {}, end {}", begin, end); + return false; + } + + if (end.realValue() - begin.realValue() > Config.cache_result_max_row_count) { + LOG.info("partition key range is too large, begin {}, end {}", begin.realValue(), end.realValue()); + return false; + } + + while (begin.realValue() <= end.realValue()) { + PartitionKey key = PartitionKey.createPartitionKey( + Lists.newArrayList(new PartitionValue(begin.toString())), + Lists.newArrayList(partitionColumn)); + PartitionSingle single = new PartitionSingle(); + single.setCacheKey(begin); + single.setPartitionKey(key); + partitionSingleList.add(single); + begin.add(1); + } + return true; + } + + private PartitionColumnFilter createPartitionFilter(CompoundPredicate partitionKeyPredicate, + Column partitionColumn) { + if (partitionKeyPredicate.getOp() != CompoundPredicate.Operator.AND) { + LOG.debug("not and op"); + return null; + } + PartitionColumnFilter partitionColumnFilter = new PartitionColumnFilter(); + ; + for (Expr expr : partitionKeyPredicate.getChildren()) { + if (expr instanceof BinaryPredicate) { + BinaryPredicate binPredicate = (BinaryPredicate) expr; + BinaryPredicate.Operator op = binPredicate.getOp(); + if (binPredicate.getChildren().size() != 2) { + LOG.warn("child size {}", binPredicate.getChildren().size()); + continue; + } + if (binPredicate.getOp() == BinaryPredicate.Operator.NE) { + LOG.debug("not support NE operator"); + continue; + } + Expr slotBinding; + if (binPredicate.getChild(1) instanceof LiteralExpr) { + slotBinding = binPredicate.getChild(1); + } else if (binPredicate.getChild(0) instanceof LiteralExpr) { + slotBinding = binPredicate.getChild(0); + } else { + LOG.debug("not find LiteralExpr"); + continue; + } + + LiteralExpr literal = (LiteralExpr) slotBinding; + switch (op) { + case EQ: //= + partitionColumnFilter.setLowerBound(literal, true); + partitionColumnFilter.setUpperBound(literal, true); + break; + case LE: //<= + partitionColumnFilter.setUpperBound(literal, true); + break; + case LT: //< + partitionColumnFilter.setUpperBound(literal, false); + break; + case GE: //>= + partitionColumnFilter.setLowerBound(literal, true); + + break; + case GT: //> + partitionColumnFilter.setLowerBound(literal, false); + break; + default: + break; + } + } else if (expr instanceof InPredicate) { + InPredicate inPredicate = (InPredicate) expr; + if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) { + continue; + } + partitionColumnFilter.setInPredicate(inPredicate); + } + } + return partitionColumnFilter; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java new file mode 100644 index 00000000000000..c5597c2549f420 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe.cache; + +import com.google.common.collect.Lists; +import org.apache.doris.analysis.Expr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Type; +import org.apache.doris.qe.RowBatch; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +public class RowBatchBuilder { + private static final Logger LOG = LogManager.getLogger(RowBatchBuilder.class); + + private CacheBeProxy.UpdateCacheRequest updateRequest; + private CacheAnalyzer.CacheMode cacheMode; + private int keyIndex; + private Type keyType; + private HashMap cachePartMap; + private List rowList; + private int batchSize; + private int rowSize; + private int dataSize; + + public int getRowSize() { + return rowSize; + } + + public RowBatchBuilder(CacheAnalyzer.CacheMode model) { + cacheMode = model; + keyIndex = 0; + keyType = Type.INVALID; + rowList = Lists.newArrayList(); + cachePartMap = new HashMap<>(); + batchSize = 0; + rowSize = 0; + dataSize = 0; + } + + public void partitionIndex(ArrayList resultExpr, + List columnLabel, Column partColumn, + List newSingleList) { + if (cacheMode != CacheAnalyzer.CacheMode.Partition) { + return; + } + + for (int i = 0; i < columnLabel.size(); i++) { + if (columnLabel.get(i).equalsIgnoreCase(partColumn.getName())) { + keyType = resultExpr.get(i).getType(); + keyIndex = i; + break; + } + } + if (newSingleList != null) { + for (PartitionRange.PartitionSingle single : newSingleList) { + cachePartMap.put(single.getCacheKey().realValue(), single); + } + } else { + LOG.info("no new partition single list "); + } + } + + public void copyRowData(RowBatch rowBatch) { + batchSize++; + rowSize += rowBatch.getBatch().getRowsSize(); + for (ByteBuffer buf : rowBatch.getBatch().getRows()) { + byte[] bytes = Arrays.copyOfRange(buf.array(), buf.position(), buf.limit()); + dataSize += bytes.length; + rowList.add(bytes); + } + } + + public CacheBeProxy.UpdateCacheRequest buildSqlUpdateRequest(String sql, long partitionKey, long lastVersion, long lastestTime) { + if (updateRequest == null) { + updateRequest = new CacheBeProxy.UpdateCacheRequest(sql); + } + updateRequest.addValue(partitionKey, lastVersion, lastestTime, rowList); + return updateRequest; + } + + + public PartitionRange.PartitionKeyType getKeyFromRow(byte[] row, int index, Type type) { + PartitionRange.PartitionKeyType key = new PartitionRange.PartitionKeyType(); + ByteBuffer buf = ByteBuffer.wrap(row); + int len; + for (int i = 0; i <= index; i++) { + len = buf.get(); + if (i < index) { + buf.position(buf.position() + len); + } + if (i == index) { + byte[] content = Arrays.copyOfRange(buf.array(), buf.position(), buf.position() + len); + String str = new String(content); + key.init(type, str.toString()); + } + } + return key; + } + + /** + * Rowbatch split to Row + */ + public CacheBeProxy.UpdateCacheRequest buildPartitionUpdateRequest(String sql) { + if (updateRequest == null) { + updateRequest = new CacheBeProxy.UpdateCacheRequest(sql); + } + HashMap> partRowMap = new HashMap<>(); + List partitionRowList; + PartitionRange.PartitionKeyType cacheKey; + for (byte[] row : rowList) { + cacheKey = getKeyFromRow(row, keyIndex, keyType); + if (!cachePartMap.containsKey(cacheKey.realValue())) { + LOG.info("cant find partition key {}", cacheKey.realValue()); + continue; + } + if (!partRowMap.containsKey(cacheKey.realValue())) { + partitionRowList = Lists.newArrayList(); + partitionRowList.add(row); + partRowMap.put(cacheKey.realValue(), partitionRowList); + } else { + partRowMap.get(cacheKey).add(row); + } + } + + for (HashMap.Entry> entry : partRowMap.entrySet()) { + Long key = entry.getKey(); + PartitionRange.PartitionSingle partition = cachePartMap.get(key); + partitionRowList = entry.getValue(); + updateRequest.addValue(key, partition.getPartition().getVisibleVersion(), + partition.getPartition().getVisibleVersionTime(), partitionRowList); + } + return updateRequest; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java new file mode 100644 index 00000000000000..43d12b02ff09d8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe.cache; + +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.common.Status; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.qe.RowBatch; +import org.apache.doris.thrift.TUniqueId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class SqlCache extends Cache { + private static final Logger LOG = LogManager.getLogger(SqlCache.class); + + public SqlCache(TUniqueId queryId, SelectStmt selectStmt) { + super(queryId, selectStmt); + } + + public void setCacheInfo(CacheAnalyzer.CacheTable latestTable) { + this.latestTable = latestTable; + } + + public CacheProxy.FetchCacheResult getCacheData(Status status) { + CacheProxy.FetchCacheRequest request = new CacheProxy.FetchCacheRequest(selectStmt.toSql()); + request.addParam(latestTable.latestId, latestTable.latestVersion, + latestTable.latestTime); + CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status); + if (status.ok() && cacheResult != null) { + cacheResult.all_count = 1; + MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L); + hitRange = HitRange.Full; + } + return cacheResult; + } + + public SelectStmt getRewriteStmt() { + return null; + } + + public void copyRowBatch(RowBatch rowBatch) { + if (rowBatchBuilder == null) { + rowBatchBuilder = new RowBatchBuilder(CacheAnalyzer.CacheMode.Sql); + } + rowBatchBuilder.copyRowData(rowBatch); + } + + public void updateCache() { + if (!super.checkRowLimit()) { + return; + } + + CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildSqlUpdateRequest(selectStmt.toSql(), + latestTable.latestId, latestTable.latestVersion, latestTable.latestTime); + if (updateRequest.value_count > 0) { + CacheBeProxy proxy = new CacheBeProxy(); + Status status = new Status(); + proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status); + LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}", + CacheAnalyzer.CacheMode.Sql, DebugUtil.printId(queryId), DebugUtil.printId(updateRequest.sql_key), + updateRequest.value_count, updateRequest.row_count, updateRequest.data_size); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java new file mode 100644 index 00000000000000..ae2d2b38abb0e3 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java @@ -0,0 +1,859 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.catalog.Type; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TStorageType; + +import org.apache.doris.qe.ConnectScheduler; +import org.apache.doris.qe.cache.Cache; +import org.apache.doris.qe.cache.CacheCoordinator; +import org.apache.doris.qe.cache.PartitionCache; +import org.apache.doris.qe.cache.PartitionRange; +import org.apache.doris.qe.cache.CacheAnalyzer; +import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; +import org.apache.doris.qe.cache.RowBatchBuilder; +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.analysis.SetPassVar; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.Planner; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.system.Backend; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.service.FrontendOptions; +import org.apache.doris.proto.PUniqueId; +import org.apache.doris.alter.SchemaChangeHandler; +import org.apache.doris.catalog.BrokerMgr; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.MaterializedIndex.IndexState; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RandomDistributionInfo; +import org.apache.doris.catalog.SinglePartitionInfo; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.load.Load; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.mysql.privilege.MockedAuth; +import org.apache.doris.mysql.MysqlChannel; +import org.apache.doris.mysql.MysqlSerializer; +import org.apache.doris.persist.EditLog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TUniqueId; + +import mockit.Mocked; +import mockit.Tested; +import mockit.Injectable; +import mockit.Expectations; +import org.apache.doris.common.jmockit.Deencapsulation; + +import com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.StringReader; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.List; + +public class PartitionCacheTest { + private static final Logger LOG = LogManager.getLogger(PartitionCacheTest.class); + public static String clusterName = "testCluster"; + public static String dbName = "testDb"; + public static String fullDbName = "testCluster:testDb"; + public static String tableName = "testTbl"; + public static String userName = "testUser"; + + private static ConnectContext context; + + private List newRangeList; + private Cache.HitRange hitRange; + private Analyzer analyzer; + private Database db; + + @Mocked + private PaloAuth auth; + @Mocked + private SystemInfoService service; + @Mocked + private Catalog catalog; + @Mocked + private ConnectContext ctx; + @Mocked + MysqlChannel channel; + @Mocked + ConnectScheduler scheduler; + + @BeforeClass + public static void start() { + MetricRepo.init(); + try { + FrontendOptions.init(); + context = new ConnectContext(null); + Config.cache_enable_sql_mode = true; + Config.cache_enable_partition_mode = true; + context.getSessionVariable().setEnableSqlCache(true); + context.getSessionVariable().setEnablePartitionCache(true); + Config.cache_last_version_interval_second = 7200; + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } + + @Before + public void setUp() { + MockedAuth.mockedAuth(auth); + MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1"); + + db = new Database(1L, fullDbName); + OlapTable tbl1 = createOrderTable(); + OlapTable tbl2 = createProfileTable(); + OlapTable tbl3 = createEventTable(); + db.createTable(tbl1); + db.createTable(tbl2); + db.createTable(tbl3); + + new Expectations(catalog) { + { + catalog.getAuth(); + minTimes = 0; + result = auth; + + Deencapsulation.invoke(Catalog.class, "getCurrentSystemInfo"); + minTimes = 0; + result = service; + + catalog.getDb(fullDbName); + minTimes = 0; + result = db; + + catalog.getDb(dbName); + minTimes = 0; + result = db; + + catalog.getDb(db.getId()); + minTimes = 0; + result = db; + + catalog.getDbNames(); + minTimes = 0; + result = Lists.newArrayList(fullDbName); + } + }; + + QueryState state = new QueryState(); + channel.reset(); + + new Expectations(ctx) { + { + ctx.getMysqlChannel(); + minTimes = 0; + result = channel; + + ctx.getClusterName(); + minTimes = 0; + result = clusterName; + + ctx.getSerializer(); + minTimes = 0; + result = MysqlSerializer.newInstance(); + + ctx.getCatalog(); + minTimes = 0; + result = catalog; + + ctx.getState(); + minTimes = 0; + result = state; + + ctx.getConnectScheduler(); + minTimes = 0; + result = scheduler; + + ctx.getConnectionId(); + minTimes = 0; + result = 1; + + ctx.getQualifiedUser(); + minTimes = 0; + result = userName; + + ctx.getForwardedStmtId(); + minTimes = 0; + result = 123L; + + ctx.setKilled(); + minTimes = 0; + ctx.updateReturnRows(anyInt); + minTimes = 0; + ctx.setQueryId((TUniqueId) any); + minTimes = 0; + + ctx.queryId(); + minTimes = 0; + result = new TUniqueId(); + + ctx.getStartTime(); + minTimes = 0; + result = 0L; + + ctx.getDatabase(); + minTimes = 0; + result = dbName; + + SessionVariable sessionVariable = new SessionVariable(); + ctx.getSessionVariable(); + minTimes = 0; + result = sessionVariable; + + ctx.setStmtId(anyLong); + minTimes = 0; + + ctx.getStmtId(); + minTimes = 0; + result = 1L; + } + }; + + analyzer = new Analyzer(catalog, ctx); + newRangeList = Lists.newArrayList(); + } + + private void test1(){ + new Expectations(catalog) { + { + catalog.getAuth(); + result = auth; + } + }; + } + + private OlapTable createOrderTable() { + Column column1 = new Column("date", ScalarType.INT); + Column column2 = new Column("id", ScalarType.INT); + Column column3 = new Column("value", ScalarType.INT); + List columns = Lists.newArrayList(column1, column2, column3); + + MaterializedIndex baseIndex = new MaterializedIndex(10001, IndexState.NORMAL); + RandomDistributionInfo distInfo = new RandomDistributionInfo(10); + + PartitionInfo partInfo = new RangePartitionInfo(Lists.newArrayList(column1)); + + Partition part12 = new Partition(20200112, "p20200112", baseIndex, distInfo); + part12.SetVisibleVersion(1,1,1578762000000L); //2020-01-12 1:00:00 + Partition part13 = new Partition(20200113, "p20200113", baseIndex, distInfo); + part13.SetVisibleVersion(1,1,1578848400000L); //2020-01-13 1:00:00 + Partition part14 = new Partition(20200114, "p20200114", baseIndex, distInfo); + part14.SetVisibleVersion(1,1,1578934800000L); //2020-01-14 1:00:00 + Partition part15 = new Partition(20200115, "p20200115", baseIndex, distInfo); + part15.SetVisibleVersion(2,2,1579053661000L); //2020-01-15 10:01:01 + + OlapTable table = new OlapTable(10000L, "order", columns,KeysType.DUP_KEYS, partInfo, distInfo); + + short shortKeyColumnCount = 1; + table.setIndexMeta(1000, "group1", columns, 1,1,shortKeyColumnCount,TStorageType.COLUMN, KeysType.AGG_KEYS); + + List column = Lists.newArrayList(); + column.add(column1); + + table.setIndexMeta(new Long(2), "test", column, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.AGG_KEYS); + Deencapsulation.setField(table, "baseIndexId", 1000); + + table.addPartition(part12); + table.addPartition(part13); + table.addPartition(part14); + table.addPartition(part15); + + //table.setIndexSchemaInfo(baseIndex.getId(), "order", columns, 0, 1, (short) 1); + table.setBaseIndexId(baseIndex.getId()); + + return table; + } + + private ScanNode createOrderScanNode(){ + OlapTable table = createOrderTable(); + TupleDescriptor desc = new TupleDescriptor(new TupleId(10004)); + desc.setTable(table); + ScanNode node = new OlapScanNode(new PlanNodeId(10008), desc, "ordernode"); + return node; + } + + private OlapTable createProfileTable() { + Column column1 = new Column("eventdate", ScalarType.DATE); + Column column2 = new Column("userid", ScalarType.INT); + Column column3 = new Column("country", ScalarType.INT); + List columns = Lists.newArrayList(column1, column2, column3); + + MaterializedIndex baseIndex = new MaterializedIndex(20001, IndexState.NORMAL); + RandomDistributionInfo distInfo = new RandomDistributionInfo(10); + + PartitionInfo partInfo = new RangePartitionInfo(Lists.newArrayList(column1)); + + Partition part12 = new Partition(2020112, "p20200112", baseIndex, distInfo); + part12.SetVisibleVersion(1,1,1578762000000L); //2020-01-12 1:00:00 + Partition part13 = new Partition(2020113, "p20200113", baseIndex, distInfo); + part13.SetVisibleVersion(1,1,1578848400000L); //2020-01-13 1:00:00 + Partition part14 = new Partition(2020114, "p20200114", baseIndex, distInfo); + part14.SetVisibleVersion(1,1,1578934800000L); //2020-01-14 1:00:00 + Partition part15 = new Partition(2020115, "p20200115", baseIndex, distInfo); + part15.SetVisibleVersion(2,2,1579021200000L); //2020-01-15 1:00:00 + + OlapTable table = new OlapTable(20000L, "userprofile", columns,KeysType.DUP_KEYS, partInfo, distInfo); + + short shortKeyColumnCount = 1; + table.setIndexMeta(1000, "group1", columns, 1,1,shortKeyColumnCount,TStorageType.COLUMN, KeysType.AGG_KEYS); + + List column = Lists.newArrayList(); + column.add(column1); + + table.setIndexMeta(new Long(2), "test", column, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.AGG_KEYS); + Deencapsulation.setField(table, "baseIndexId", 1000); + + table.addPartition(part12); + table.addPartition(part13); + table.addPartition(part14); + table.addPartition(part15); + + //table.setIndexSchemaInfo(baseIndex.getId(), "userprofile", columns, 0, 1, (short) 1); + table.setBaseIndexId(baseIndex.getId()); + + return table; + } + + private ScanNode createProfileScanNode(){ + OlapTable table = createProfileTable(); + TupleDescriptor desc = new TupleDescriptor(new TupleId(20004)); + desc.setTable(table); + ScanNode node = new OlapScanNode(new PlanNodeId(20008), desc, "userprofilenode"); + return node; + } + + /** + * table appevent(date(pk), userid, eventid, eventtime), stream load every 5 miniutes + */ + private OlapTable createEventTable() { + Column column1 = new Column("eventdate", ScalarType.DATE); + Column column2 = new Column("userid", ScalarType.INT); + Column column3 = new Column("eventid", ScalarType.INT); + Column column4 = new Column("eventtime", ScalarType.DATETIME); + List columns = Lists.newArrayList(column1, column2, column3,column4); + PartitionInfo partInfo = new RangePartitionInfo(Lists.newArrayList(column1)); + MaterializedIndex baseIndex = new MaterializedIndex(30001, IndexState.NORMAL); + RandomDistributionInfo distInfo = new RandomDistributionInfo(10); + + Partition part12 = new Partition(20200112, "p20200112", baseIndex, distInfo); + part12.SetVisibleVersion(1,1,1578762000000L); //2020-01-12 1:00:00 + Partition part13 = new Partition(20200113, "p20200113", baseIndex, distInfo); + part13.SetVisibleVersion(1,1,1578848400000L); //2020-01-13 1:00:00 + Partition part14 = new Partition(20200114, "p20200114", baseIndex, distInfo); + part14.SetVisibleVersion(1,1,1578934800000L); //2020-01-14 1:00:00 + Partition part15 = new Partition(20200115, "p20200115", baseIndex, distInfo); + part15.SetVisibleVersion(2,2,1579053661000L); //2020-01-15 10:01:01 + + OlapTable table = new OlapTable(30000L, "appevent", columns,KeysType.DUP_KEYS, partInfo, distInfo); + + short shortKeyColumnCount = 1; + table.setIndexMeta(1000, "group1", columns, 1,1,shortKeyColumnCount,TStorageType.COLUMN, KeysType.AGG_KEYS); + + List column = Lists.newArrayList(); + column.add(column1); + + table.setIndexMeta(new Long(2), "test", column, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.AGG_KEYS); + Deencapsulation.setField(table, "baseIndexId", 1000); + + table.addPartition(part12); + table.addPartition(part13); + table.addPartition(part14); + table.addPartition(part15); + + //table.setIndexSchemaInfo(baseIndex.getId(), "appevent", columns, 0, 1, (short) 1); + table.setBaseIndexId(baseIndex.getId()); + + return table; + } + + private ScanNode createEventScanNode(){ + OlapTable table = createEventTable(); + TupleDescriptor desc = new TupleDescriptor(new TupleId(30002)); + desc.setTable(table); + ScanNode node = new OlapScanNode(new PlanNodeId(30004), desc, "appeventnode"); + return node; + } + + private StatementBase parseSql(String sql){ + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(sql))); + StatementBase parseStmt = null; + try { + parseStmt = SqlParserUtils.getFirstStmt(parser); + parseStmt.analyze(analyzer); + } catch (AnalysisException e) { + LOG.warn("Part,an_ex={}", e); + Assert.fail(e.getMessage()); + } catch (UserException e) { + LOG.warn("Part,ue_ex={}", e); + Assert.fail(e.getMessage()); + } catch (Exception e) { + LOG.warn("Part,cm_ex={}", e); + Assert.fail(e.getMessage()); + } + return parseStmt; + } + + @Test + public void testCacheNode() throws Exception { + CacheCoordinator cp = CacheCoordinator.getInstance(); + cp.DebugModel = true; + Backend bd1 = new Backend(1, "", 1000); + bd1.updateOnce(0,0,0); + Backend bd2 = new Backend(2, "", 2000); + bd2.updateOnce(0,0,0); + Backend bd3 = new Backend(3, "", 3000); + bd3.updateOnce(0,0,0); + cp.addBackend(bd1); + cp.addBackend(bd2); + cp.addBackend(bd3); + + PUniqueId key1 = new PUniqueId(); + key1.hi = 1L; + key1.lo = 1L; + Backend bk = cp.findBackend(key1); + Assert.assertNotNull(bk); + Assert.assertEquals(bk.getId(),3); + + key1.hi = 669560558156283345L; + key1.lo = 1L; + bk = cp.findBackend(key1); + Assert.assertNotNull(bk); + Assert.assertEquals(bk.getId(),1); + } + + @Test + public void testCacheModeNone() throws Exception { + StatementBase parseStmt = parseSql("select @@version_comment limit 1"); + List scanNodes = Lists.newArrayList(); + CacheAnalyzer ca = new CacheAnalyzer(context, parseStmt, scanNodes); + ca.checkCacheMode(0); + Assert.assertEquals(ca.getCacheMode(), CacheMode.NoNeed); + } + + @Test + public void testCacheModeTable() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT country, COUNT(userid) FROM userprofile GROUP BY country" + ); + List scanNodes = Lists.newArrayList(createProfileScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(0); + Assert.assertEquals(ca.getCacheMode(), CacheMode.Sql); + } + + @Test + public void testWithinMinTime() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT country, COUNT(userid) FROM userprofile GROUP BY country" + ); + List scanNodes = Lists.newArrayList(createProfileScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579024800000L); //2020-1-15 02:00:00 + Assert.assertEquals(ca.getCacheMode(), CacheMode.None); + } + + @Test + public void testPartitionModel() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-15\" GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); + } + + @Test + public void testParseByte() throws Exception { + RowBatchBuilder sb = new RowBatchBuilder(CacheMode.Partition); + byte[] buffer = new byte[]{10, 50, 48, 50, 48, 45, 48, 51, 45, 49, 48, 1, 51, 2, 67, 78}; + PartitionRange.PartitionKeyType key1 = sb.getKeyFromRow(buffer, 0, Type.DATE); + LOG.info("real value key1 {}",key1.realValue()); + Assert.assertEquals(key1.realValue(), 20200310); + PartitionRange.PartitionKeyType key2 = sb.getKeyFromRow(buffer, 1, Type.INT); + LOG.info("real value key2 {}",key2.realValue()); + Assert.assertEquals(key2.realValue(), 3); + } + + @Test + public void testPartitionIntTypeSql() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT `date`, COUNT(id) FROM `order` WHERE `date`>=20200112 and `date`<=20200115 GROUP BY date" + ); + List scanNodes = Lists.newArrayList(createOrderScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + try { + PartitionCache cache = (PartitionCache) ca.getCache(); + cache.rewriteSelectStmt(null); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag, true); + + int size = range.getPartitionSingleList().size(); + LOG.warn("Rewrite partition range size={}", size); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setCacheFlag(20200113L); //get data from cache + + hitRange = range.diskPartitionRange(newRangeList); + Assert.assertEquals(hitRange, Cache.HitRange.Left); + Assert.assertEquals(newRangeList.size(), 2); + Assert.assertEquals(newRangeList.get(0).getCacheKey().realValue(), 20200114); + Assert.assertEquals(newRangeList.get(1).getCacheKey().realValue(), 20200115); + + cache.rewriteSelectStmt(newRangeList); + sql = ca.getRewriteStmt().getWhereClause().toSql(); + Assert.assertEquals(sql, "(`date` >= 20200114) AND (`date` <= 20200115)"); + } catch (Exception e) { + LOG.warn("ex={}", e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSimpleCacheSql() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-15\" GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + SelectStmt selectStmt = (SelectStmt) parseStmt; + + try{ + PartitionCache cache = (PartitionCache) ca.getCache(); + cache.rewriteSelectStmt(null); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause(),null); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag,true); + + int size = range.getPartitionSingleList().size(); + LOG.warn("Rewrite partition range size={}", size); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setCacheFlag(20200113L); //get data from cache + + hitRange = range.diskPartitionRange(newRangeList); + cache.rewriteSelectStmt(newRangeList); + sql = ca.getRewriteStmt().getWhereClause().toSql(); + Assert.assertEquals(sql,"(`eventdate` >= '2020-01-14') AND (`eventdate` <= '2020-01-15')"); + } catch(Exception e){ + LOG.warn("ex={}",e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testHitPartPartition() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-14\" GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context, parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + + try { + PartitionCache cache = (PartitionCache) ca.getCache(); + + cache.rewriteSelectStmt(null); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag, true); + + int size = range.getPartitionSingleList().size(); + LOG.warn("Rewrite partition range size={}", size); + Assert.assertEquals(size, 3); + + String sql; + range.setCacheFlag(20200113); + range.setCacheFlag(20200114); + + hitRange = range.diskPartitionRange(newRangeList); + Assert.assertEquals(hitRange,Cache.HitRange.Right); + Assert.assertEquals(newRangeList.size(), 2); + Assert.assertEquals(newRangeList.get(0).getCacheKey().realValue(), 20200112); + Assert.assertEquals(newRangeList.get(1).getCacheKey().realValue(), 20200112); + + List updateRangeList = range.updatePartitionRange(); + Assert.assertEquals(updateRangeList.size(), 1); + Assert.assertEquals(updateRangeList.get(0).getCacheKey().realValue(), 20200112); + } catch (Exception e) { + LOG.warn("ex={}", e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testNoUpdatePartition() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-14\" GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context, parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + + try { + PartitionCache cache = (PartitionCache) ca.getCache(); + + cache.rewriteSelectStmt(null); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag, true); + + int size = range.getPartitionSingleList().size(); + LOG.warn("Rewrite partition range size={}", size); + Assert.assertEquals(size, 3); + + String sql; + range.setCacheFlag(20200112); //get data from cache + range.setCacheFlag(20200113); + range.setCacheFlag(20200114); + + hitRange = range.diskPartitionRange(newRangeList); + Assert.assertEquals(hitRange, Cache.HitRange.Full); + Assert.assertEquals(newRangeList.size(), 0); + } catch (Exception e) { + LOG.warn("ex={}", e); + Assert.fail(e.getMessage()); + } + } + + + @Test + public void testUpdatePartition() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-15\" GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + + try { + PartitionCache cache = (PartitionCache) ca.getCache(); + + cache.rewriteSelectStmt(null); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag, true); + + int size = range.getPartitionSingleList().size(); + LOG.warn("Rewrite partition range size={}", size); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setTooNewByKey(20200115); + + range.diskPartitionRange(newRangeList); + Assert.assertEquals(newRangeList.size(), 2); + cache.rewriteSelectStmt(newRangeList); + + sql = ca.getRewriteStmt().getWhereClause().toSql(); + Assert.assertEquals(sql, "(`eventdate` >= '2020-01-13') AND (`eventdate` <= '2020-01-15')"); + + List updateRangeList = range.updatePartitionRange(); + Assert.assertEquals(updateRangeList.size(), 2); + Assert.assertEquals(updateRangeList.get(0).getCacheKey().realValue(), 20200113); + Assert.assertEquals(updateRangeList.get(1).getCacheKey().realValue(), 20200114); + } catch (Exception e) { + LOG.warn("ex={}", e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testRewriteMultiPredicate1() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>\"2020-01-11\" and eventdate<\"2020-01-16\"" + + " and eventid=1 GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + try{ + PartitionCache cache = (PartitionCache) ca.getCache(); + + cache.rewriteSelectStmt(null); + LOG.warn("Nokey multi={}", cache.getNokeyStmt().getWhereClause().toSql()); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause().toSql(),"`eventid` = 1"); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag,true); + + int size = range.getPartitionSingleList().size(); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setCacheFlag(20200113L); //get data from cache + + range.diskPartitionRange(newRangeList); + + cache.rewriteSelectStmt(newRangeList); + sql = ca.getRewriteStmt().getWhereClause().toSql(); + LOG.warn("MultiPredicate={}", sql); + Assert.assertEquals(sql,"((`eventdate` > '2020-01-13') AND (`eventdate` < '2020-01-16')) AND (`eventid` = 1)"); + } catch(Exception e){ + LOG.warn("multi ex={}",e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testRewriteJoin() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT appevent.eventdate, country, COUNT(appevent.userid) FROM appevent" + + " INNER JOIN userprofile ON appevent.userid = userprofile.userid" + + " WHERE appevent.eventdate>=\"2020-01-12\" and appevent.eventdate<=\"2020-01-15\"" + + " and eventid=1 GROUP BY appevent.eventdate, country" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + try{ + PartitionCache cache = (PartitionCache) ca.getCache(); + cache.rewriteSelectStmt(null); + LOG.warn("Join nokey={}", cache.getNokeyStmt().getWhereClause().toSql()); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause().toSql(),"`eventid` = 1"); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag,true); + + int size = range.getPartitionSingleList().size(); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setCacheFlag(20200113L); //get data from cache + + range.diskPartitionRange(newRangeList); + + cache.rewriteSelectStmt(newRangeList); + sql = ca.getRewriteStmt().getWhereClause().toSql(); + LOG.warn("Join rewrite={}", sql); + Assert.assertEquals(sql,"((`appevent`.`eventdate` >= '2020-01-14')" + + " AND (`appevent`.`eventdate` <= '2020-01-15')) AND (`eventid` = 1)"); + } catch(Exception e){ + LOG.warn("Join ex={}",e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSubSelect() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, sum(pv) FROM (SELECT eventdate, COUNT(userid) AS pv FROM appevent WHERE eventdate>\"2020-01-11\" AND eventdate<\"2020-01-16\"" + + " AND eventid=1 GROUP BY eventdate) tbl GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + try{ + PartitionCache cache = (PartitionCache) ca.getCache(); + + cache.rewriteSelectStmt(null); + LOG.warn("Sub nokey={}", cache.getNokeyStmt().toSql()); + Assert.assertEquals(cache.getNokeyStmt().toSql(),"SELECT `eventdate` AS `eventdate`, sum(`pv`) AS `sum(``pv``)` FROM (" + + "SELECT `eventdate` AS `eventdate`, count(`userid`) AS `pv` FROM `testCluster:testDb`.`appevent` WHERE `eventid` = 1" + + " GROUP BY `eventdate`) tbl GROUP BY `eventdate`"); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag,true); + + int size = range.getPartitionSingleList().size(); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setCacheFlag(20200113L); //get data from cache + + range.diskPartitionRange(newRangeList); + + cache.rewriteSelectStmt(newRangeList); + sql = ca.getRewriteStmt().toSql(); + LOG.warn("Sub rewrite={}", sql); + Assert.assertEquals(sql,"SELECT `eventdate` AS `eventdate`, sum(`pv`) AS `sum(``pv``)` FROM (" + + "SELECT `eventdate` AS `eventdate`, count(`userid`) AS `pv` FROM `testCluster:testDb`.`appevent` WHERE " + + "((`eventdate` > '2020-01-13') AND (`eventdate` < '2020-01-16')) AND (`eventid` = 1) GROUP BY `eventdate`) tbl GROUP BY `eventdate`"); + } catch(Exception e){ + LOG.warn("sub ex={}",e); + Assert.fail(e.getMessage()); + } + } +} + diff --git a/run-ut.sh b/run-ut.sh index 6c9fdd6bb9675d..6b6288fe769ffe 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -238,6 +238,7 @@ ${DORIS_TEST_BINARY_DIR}/runtime/small_file_mgr_test ${DORIS_TEST_BINARY_DIR}/runtime/mem_pool_test ${DORIS_TEST_BINARY_DIR}/runtime/memory/chunk_allocator_test ${DORIS_TEST_BINARY_DIR}/runtime/memory/system_allocator_test +${DORIS_TEST_BINARY_DIR}/runtime/cache/partition_cache_test # Running expr Unittest # Running http