getCache() {
+ return cache;
+ }
+
+ private byte[] decompress(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ final int decompressedLen = ByteBuffer.wrap(bytes).getInt();
+ final byte[] out = new byte[decompressedLen];
+ LZ4_DECOMPRESSOR.decompress(bytes, Integer.BYTES, out, 0, out.length);
+ return out;
+ }
+
+ private byte[] compress(byte[] value) {
+ final int len = LZ4_COMPRESSOR.maxCompressedLength(value.length);
+ final byte[] out = new byte[len];
+ final int compressedSize = LZ4_COMPRESSOR.compress(value, 0, value.length, out, 0);
+ return ByteBuffer.allocate(compressedSize + Integer.BYTES)
+ .putInt(value.length)
+ .put(out, 0, compressedSize)
+ .array();
+ }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 63aa2c51779f73..33352a86eee632 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -445,6 +445,7 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, json);
}
+ @Deprecated
private void readFields(DataInput in) throws IOException {
name = Text.readString(in);
type = ColumnType.read(in);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java
index 12e69ecad7b834..a7fffe2ae483a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java
@@ -119,6 +119,7 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, json);
}
+ @Deprecated
private void readFields(DataInput in) throws IOException {
ip = Text.readString(in);
port = in.readInt();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
index 4a1c59d6a095ab..b5fb03c87454d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
@@ -133,6 +133,7 @@ public static TempPartitions read(DataInput in) throws IOException {
}
}
+ @Deprecated
private void readFields(DataInput in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 1623c3ed04159f..4d0f8013156062 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -21,93 +21,108 @@
import org.apache.doris.http.HttpServer;
public class Config extends ConfigBase {
-
+
/**
* The max size of one sys log and audit log
*/
- @ConfField public static int log_roll_size_mb = 1024; // 1 GB
+ @ConfField
+ public static int log_roll_size_mb = 1024; // 1 GB
/**
* sys_log_dir:
- * This specifies FE log dir. FE will produces 2 log files:
- * fe.log: all logs of FE process.
- * fe.warn.log all WARNING and ERROR log of FE process.
- *
+ * This specifies FE log dir. FE will produces 2 log files:
+ * fe.log: all logs of FE process.
+ * fe.warn.log all WARNING and ERROR log of FE process.
+ *
* sys_log_level:
- * INFO, WARNING, ERROR, FATAL
- *
+ * INFO, WARNING, ERROR, FATAL
+ *
* sys_log_roll_num:
- * Maximal FE log files to be kept within an sys_log_roll_interval.
- * default is 10, which means there will be at most 10 log files in a day
- *
+ * Maximal FE log files to be kept within an sys_log_roll_interval.
+ * default is 10, which means there will be at most 10 log files in a day
+ *
* sys_log_verbose_modules:
- * Verbose modules. VERBOSE level is implemented by log4j DEBUG level.
- * eg:
- * sys_log_verbose_modules = org.apache.doris.catalog
- * This will only print debug log of files in package org.apache.doris.catalog and all its sub packages.
- *
+ * Verbose modules. VERBOSE level is implemented by log4j DEBUG level.
+ * eg:
+ * sys_log_verbose_modules = org.apache.doris.catalog
+ * This will only print debug log of files in package org.apache.doris.catalog and all its sub packages.
+ *
* sys_log_roll_interval:
- * DAY: log suffix is yyyyMMdd
- * HOUR: log suffix is yyyyMMddHH
- *
+ * DAY: log suffix is yyyyMMdd
+ * HOUR: log suffix is yyyyMMddHH
+ *
* sys_log_delete_age:
- * default is 7 days, if log's last modify time is 7 days ago, it will be deleted.
- * support format:
- * 7d 7 days
- * 10h 10 hours
- * 60m 60 mins
- * 120s 120 seconds
+ * default is 7 days, if log's last modify time is 7 days ago, it will be deleted.
+ * support format:
+ * 7d 7 days
+ * 10h 10 hours
+ * 60m 60 mins
+ * 120s 120 seconds
*/
@ConfField
public static String sys_log_dir = PaloFe.DORIS_HOME_DIR + "/log";
- @ConfField public static String sys_log_level = "INFO";
- @ConfField public static int sys_log_roll_num = 10;
- @ConfField public static String[] sys_log_verbose_modules = {};
- @ConfField public static String sys_log_roll_interval = "DAY";
- @ConfField public static String sys_log_delete_age = "7d";
+ @ConfField
+ public static String sys_log_level = "INFO";
+ @ConfField
+ public static int sys_log_roll_num = 10;
+ @ConfField
+ public static String[] sys_log_verbose_modules = {};
+ @ConfField
+ public static String sys_log_roll_interval = "DAY";
+ @ConfField
+ public static String sys_log_delete_age = "7d";
@Deprecated
- @ConfField public static String sys_log_roll_mode = "SIZE-MB-1024";
+ @ConfField
+ public static String sys_log_roll_mode = "SIZE-MB-1024";
/**
* audit_log_dir:
- * This specifies FE audit log dir.
- * Audit log fe.audit.log contains all requests with related infos such as user, host, cost, status, etc.
- *
+ * This specifies FE audit log dir.
+ * Audit log fe.audit.log contains all requests with related infos such as user, host, cost, status, etc.
+ *
* audit_log_roll_num:
- * Maximal FE audit log files to be kept within an audit_log_roll_interval.
- *
+ * Maximal FE audit log files to be kept within an audit_log_roll_interval.
+ *
* audit_log_modules:
- * Slow query contains all queries which cost exceed *qe_slow_log_ms*
- *
+ * Slow query contains all queries which cost exceed *qe_slow_log_ms*
+ *
* qe_slow_log_ms:
- * If the response time of a query exceed this threshold, it will be recored in audit log as slow_query.
- *
+ * If the response time of a query exceed this threshold, it will be recored in audit log as slow_query.
+ *
* audit_log_roll_interval:
- * DAY: log suffix is yyyyMMdd
- * HOUR: log suffix is yyyyMMddHH
- *
+ * DAY: log suffix is yyyyMMdd
+ * HOUR: log suffix is yyyyMMddHH
+ *
* audit_log_delete_age:
- * default is 30 days, if log's last modify time is 30 days ago, it will be deleted.
- * support format:
- * 7d 7 days
- * 10h 10 hours
- * 60m 60 mins
- * 120s 120 seconds
- */
- @ConfField public static String audit_log_dir = PaloFe.DORIS_HOME_DIR + "/log";
- @ConfField public static int audit_log_roll_num = 90;
- @ConfField public static String[] audit_log_modules = {"slow_query", "query"};
- @ConfField(mutable = true) public static long qe_slow_log_ms = 5000;
- @ConfField public static String audit_log_roll_interval = "DAY";
- @ConfField public static String audit_log_delete_age = "30d";
+ * default is 30 days, if log's last modify time is 30 days ago, it will be deleted.
+ * support format:
+ * 7d 7 days
+ * 10h 10 hours
+ * 60m 60 mins
+ * 120s 120 seconds
+ */
+ @ConfField
+ public static String audit_log_dir = PaloFe.DORIS_HOME_DIR + "/log";
+ @ConfField
+ public static int audit_log_roll_num = 90;
+ @ConfField
+ public static String[] audit_log_modules = {"slow_query", "query"};
+ @ConfField(mutable = true)
+ public static long qe_slow_log_ms = 5000;
+ @ConfField
+ public static String audit_log_roll_interval = "DAY";
+ @ConfField
+ public static String audit_log_delete_age = "30d";
@Deprecated
- @ConfField public static String audit_log_roll_mode = "TIME-DAY";
+ @ConfField
+ public static String audit_log_roll_mode = "TIME-DAY";
/**
* plugin_dir:
- * plugin install directory
+ * plugin install directory
*/
- @ConfField public static String plugin_dir = System.getenv("DORIS_HOME") + "/plugins";
+ @ConfField
+ public static String plugin_dir = System.getenv("DORIS_HOME") + "/plugins";
@ConfField(mutable = true, masterOnly = true)
public static boolean plugin_enable = true;
@@ -120,24 +135,26 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static int label_keep_max_second = 3 * 24 * 3600; // 3 days
-
+
/**
* The max keep time of some kind of jobs.
* like schema change job and rollup job.
*/
@ConfField(mutable = true, masterOnly = true)
public static int history_job_keep_max_second = 7 * 24 * 3600; // 7 days
-
+
/**
* Load label cleaner will run every *label_clean_interval_second* to clean the outdated jobs.
*/
- @ConfField public static int label_clean_interval_second = 4 * 3600; // 4 hours
-
+ @ConfField
+ public static int label_clean_interval_second = 4 * 3600; // 4 hours
+
/**
* the transaction will be cleaned after transaction_clean_interval_second seconds if the transaction is visible or aborted
* we should make this interval as short as possible and each clean cycle as soon as possible
*/
- @ConfField public static int transaction_clean_interval_second = 30;
+ @ConfField
+ public static int transaction_clean_interval_second = 30;
// Configurations for meta data durability
/**
@@ -146,14 +163,16 @@ public class Config extends ConfigBase {
* 1. High write performance (SSD)
* 2. Safe (RAID)
*/
- @ConfField public static String meta_dir = PaloFe.DORIS_HOME_DIR + "/doris-meta";
-
+ @ConfField
+ public static String meta_dir = PaloFe.DORIS_HOME_DIR + "/doris-meta";
+
/**
* temp dir is used to save intermediate results of some process, such as backup and restore process.
* file in this dir will be cleaned after these process is finished.
*/
- @ConfField public static String tmp_dir = PaloFe.DORIS_HOME_DIR + "/temp_dir";
-
+ @ConfField
+ public static String tmp_dir = PaloFe.DORIS_HOME_DIR + "/temp_dir";
+
/**
* Edit log type.
* BDB: write log to bdbje
@@ -161,51 +180,56 @@ public class Config extends ConfigBase {
*/
@ConfField
public static String edit_log_type = "BDB";
-
+
/**
* bdbje port
*/
@ConfField
public static int edit_log_port = 9010;
-
+
/**
* Master FE will save image every *edit_log_roll_num* meta journals.
*/
@ConfField(mutable = true, masterOnly = true)
public static int edit_log_roll_num = 50000;
-
+
/**
* Non-master FE will stop offering service
* if meta data delay gap exceeds *meta_delay_toleration_second*
*/
- @ConfField public static int meta_delay_toleration_second = 300; // 5 min
-
+ @ConfField
+ public static int meta_delay_toleration_second = 300; // 5 min
+
/**
* Master FE sync policy of bdbje.
* If you only deploy one Follower FE, set this to 'SYNC'. If you deploy more than 3 Follower FE,
* you can set this and the following 'replica_sync_policy' to WRITE_NO_SYNC.
* more info, see: http://docs.oracle.com/cd/E17277_02/html/java/com/sleepycat/je/Durability.SyncPolicy.html
*/
- @ConfField public static String master_sync_policy = "SYNC"; // SYNC, NO_SYNC, WRITE_NO_SYNC
-
+ @ConfField
+ public static String master_sync_policy = "SYNC"; // SYNC, NO_SYNC, WRITE_NO_SYNC
+
/**
* Follower FE sync policy of bdbje.
*/
- @ConfField public static String replica_sync_policy = "SYNC"; // SYNC, NO_SYNC, WRITE_NO_SYNC
-
+ @ConfField
+ public static String replica_sync_policy = "SYNC"; // SYNC, NO_SYNC, WRITE_NO_SYNC
+
/**
* Replica ack policy of bdbje.
* more info, see: http://docs.oracle.com/cd/E17277_02/html/java/com/sleepycat/je/Durability.ReplicaAckPolicy.html
*/
- @ConfField public static String replica_ack_policy = "SIMPLE_MAJORITY"; // ALL, NONE, SIMPLE_MAJORITY
-
+ @ConfField
+ public static String replica_ack_policy = "SIMPLE_MAJORITY"; // ALL, NONE, SIMPLE_MAJORITY
+
/**
* The heartbeat timeout of bdbje between master and follower.
* the default is 30 seconds, which is same as default value in bdbje.
* If the network is experiencing transient problems, of some unexpected long java GC annoying you,
* you can try to increase this value to decrease the chances of false timeouts
*/
- @ConfField public static int bdbje_heartbeat_timeout_second = 30;
+ @ConfField
+ public static int bdbje_heartbeat_timeout_second = 30;
/**
* The lock timeout of bdbje operation
@@ -235,7 +259,8 @@ public class Config extends ConfigBase {
/**
* the max txn number which bdbje can rollback when trying to rejoin the group
*/
- @ConfField public static int txn_rollback_limit = 100;
+ @ConfField
+ public static int txn_rollback_limit = 100;
/**
* Specified an IP for frontend, instead of the ip get by *InetAddress.getByName*.
@@ -243,7 +268,8 @@ public class Config extends ConfigBase {
* Default is "0.0.0.0", which means not set.
* CAN NOT set this as a hostname, only IP.
*/
- @ConfField public static String frontend_address = "0.0.0.0";
+ @ConfField
+ public static String frontend_address = "0.0.0.0";
/**
* Declare a selection strategy for those servers have many ips.
@@ -251,7 +277,8 @@ public class Config extends ConfigBase {
* this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24
* If no ip match this rule, will choose one randomly.
*/
- @ConfField public static String priority_networks = "";
+ @ConfField
+ public static String priority_networks = "";
/**
* If true, FE will reset bdbje replication group(that is, to remove all electable nodes info)
@@ -259,13 +286,14 @@ public class Config extends ConfigBase {
* If all the electable nodes can not start, we can copy the meta data
* to another node and set this config to true to try to restart the FE.
*/
- @ConfField public static String metadata_failure_recovery = "false";
+ @ConfField
+ public static String metadata_failure_recovery = "false";
/**
* If true, non-master FE will ignore the meta data delay gap between Master FE and its self,
* even if the metadata delay gap exceeds *meta_delay_toleration_second*.
* Non-master FE will still offer read service.
- *
+ *
* This is helpful when you try to stop the Master FE for a relatively long time for some reason,
* but still wish the non-master FE can offer read service.
*/
@@ -277,146 +305,164 @@ public class Config extends ConfigBase {
* This value is checked whenever a non-master FE establishes a connection to master FE via BDBJE.
* The connection is abandoned if the clock skew is larger than this value.
*/
- @ConfField public static long max_bdbje_clock_delta_ms = 5000; // 5s
+ @ConfField
+ public static long max_bdbje_clock_delta_ms = 5000; // 5s
/**
* Fe http port
* Currently, all FEs' http port must be same.
*/
- @ConfField public static int http_port = 8030;
+ @ConfField
+ public static int http_port = 8030;
/*
* Netty http param
*/
- @ConfField public static int http_max_line_length = HttpServer.DEFAULT_MAX_LINE_LENGTH;
+ @ConfField
+ public static int http_max_line_length = HttpServer.DEFAULT_MAX_LINE_LENGTH;
- @ConfField public static int http_max_header_size = HttpServer.DEFAULT_MAX_HEADER_SIZE;
+ @ConfField
+ public static int http_max_header_size = HttpServer.DEFAULT_MAX_HEADER_SIZE;
- @ConfField public static int http_max_chunk_size = HttpServer.DEFAULT_MAX_CHUNK_SIZE;
+ @ConfField
+ public static int http_max_chunk_size = HttpServer.DEFAULT_MAX_CHUNK_SIZE;
/**
* The backlog_num for netty http server
* When you enlarge this backlog_num, you should ensure it's value larger than
* the linux /proc/sys/net/core/somaxconn config
*/
- @ConfField public static int http_backlog_num = 1024;
+ @ConfField
+ public static int http_backlog_num = 1024;
/**
* The connection timeout and socket timeout config for thrift server
* The value for thrift_client_timeout_ms is set to be larger than zero to prevent
* some hang up problems in java.net.SocketInputStream.socketRead0
*/
- @ConfField public static int thrift_client_timeout_ms = 30000;
+ @ConfField
+ public static int thrift_client_timeout_ms = 30000;
/**
* The backlog_num for thrift server
* When you enlarge this backlog_num, you should ensure it's value larger than
* the linux /proc/sys/net/core/somaxconn config
*/
- @ConfField public static int thrift_backlog_num = 1024;
+ @ConfField
+ public static int thrift_backlog_num = 1024;
/**
* FE thrift server port
*/
- @ConfField public static int rpc_port = 9020;
-
+ @ConfField
+ public static int rpc_port = 9020;
+
/**
* FE mysql server port
*/
- @ConfField public static int query_port = 9030;
+ @ConfField
+ public static int query_port = 9030;
/**
* mysql service nio option.
*/
- @ConfField public static boolean mysql_service_nio_enabled = true;
+ @ConfField
+ public static boolean mysql_service_nio_enabled = true;
/**
* num of thread to handle io events in mysql.
*/
- @ConfField public static int mysql_service_io_threads_num = 4;
+ @ConfField
+ public static int mysql_service_io_threads_num = 4;
/**
* max num of thread to handle task in mysql.
*/
- @ConfField public static int max_mysql_service_task_threads_num = 4096;
+ @ConfField
+ public static int max_mysql_service_task_threads_num = 4096;
/**
* Cluster name will be shown as the title of web page
*/
- @ConfField public static String cluster_name = "Baidu Palo";
-
+ @ConfField
+ public static String cluster_name = "Baidu Palo";
+
/**
* node(FE or BE) will be considered belonging to the same Palo cluster if they have same cluster id.
* Cluster id is usually a random integer generated when master FE start at first time.
* You can also sepecify one.
*/
- @ConfField public static int cluster_id = -1;
-
+ @ConfField
+ public static int cluster_id = -1;
+
/**
* Cluster token used for internal authentication.
*/
- @ConfField public static String auth_token = "";
+ @ConfField
+ public static String auth_token = "";
// Configurations for load, clone, create table, alter table etc. We will rarely change them
/**
* Maximal waiting time for creating a single replica.
* eg.
- * if you create a table with #m tablets and #n replicas for each tablet,
- * the create table request will run at most (m * n * tablet_create_timeout_second) before timeout.
+ * if you create a table with #m tablets and #n replicas for each tablet,
+ * the create table request will run at most (m * n * tablet_create_timeout_second) before timeout.
*/
@ConfField(mutable = true, masterOnly = true)
public static int tablet_create_timeout_second = 1;
-
+
/**
* In order not to wait too long for create table(index), set a max timeout.
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_create_table_timeout_second = 60;
-
+
/**
* Maximal waiting time for all publish version tasks of one transaction to be finished
*/
@ConfField(mutable = true, masterOnly = true)
public static int publish_version_timeout_second = 30; // 30 seconds
-
+
/**
* minimal intervals between two publish version action
*/
- @ConfField public static int publish_version_interval_ms = 10;
+ @ConfField
+ public static int publish_version_interval_ms = 10;
/**
* The thrift server max worker threads
*/
- @ConfField public static int thrift_server_max_worker_threads = 4096;
+ @ConfField
+ public static int thrift_server_max_worker_threads = 4096;
/**
* Maximal wait seconds for straggler node in load
* eg.
- * there are 3 replicas A, B, C
- * load is already quorum finished(A,B) at t1 and C is not finished
- * if (current_time - t1) > 300s, then palo will treat C as a failure node
- * will call transaction manager to commit the transaction and tell transaction manager
- * that C is failed
- *
+ * there are 3 replicas A, B, C
+ * load is already quorum finished(A,B) at t1 and C is not finished
+ * if (current_time - t1) > 300s, then palo will treat C as a failure node
+ * will call transaction manager to commit the transaction and tell transaction manager
+ * that C is failed
+ *
* This is also used when waiting for publish tasks
- *
+ *
* TODO this parameter is the default value for all job and the DBA could specify it for separate job
*/
@ConfField(mutable = true, masterOnly = true)
public static int load_straggler_wait_second = 300;
-
+
/**
* Maximal memory layout length of a row. default is 100 KB.
* In BE, the maximal size of a RowBlock is 100MB(Configure as max_unpacked_row_block_size in be.conf).
* And each RowBlock contains 1024 rows. So the maximal size of a row is approximately 100 KB.
- *
+ *
* eg.
- * schema: k1(int), v1(decimal), v2(varchar(2000))
- * then the memory layout length of a row is: 8(int) + 40(decimal) + 2000(varchar) = 2048 (Bytes)
- *
+ * schema: k1(int), v1(decimal), v2(varchar(2000))
+ * then the memory layout length of a row is: 8(int) + 40(decimal) + 2000(varchar) = 2048 (Bytes)
+ *
* See memory layout length of all types, run 'help create table' in mysql-client.
- *
- * If you want to increase this number to support more columns in a row, you also need to increase the
+ *
+ * If you want to increase this number to support more columns in a row, you also need to increase the
* max_unpacked_row_block_size in be.conf. But the performance impact is unknown.
*/
@ConfField(mutable = true, masterOnly = true)
@@ -426,10 +472,11 @@ public class Config extends ConfigBase {
* The load scheduler running interval.
* A load job will transfer its state from PENDING to LOADING to FINISHED.
* The load scheduler will transfer load job from PENDING to LOADING
- * while the txn callback will transfer load job from LOADING to FINISHED.
+ * while the txn callback will transfer load job from LOADING to FINISHED.
* So a load job will cost at most one interval to finish when the concurrency has not reached the upper limit.
*/
- @ConfField public static int load_checker_interval_second = 5;
+ @ConfField
+ public static int load_checker_interval_second = 5;
/**
* Concurrency of HIGH priority pending load jobs.
@@ -440,26 +487,31 @@ public class Config extends ConfigBase {
* Currently, you can not specified the job priority manually,
* and do not change this if you know what you are doing.
*/
- @ConfField public static int load_pending_thread_num_high_priority = 3;
+ @ConfField
+ public static int load_pending_thread_num_high_priority = 3;
/**
* Concurrency of NORMAL priority pending load jobs.
* Do not change this if you know what you are doing.
*/
- @ConfField public static int load_pending_thread_num_normal_priority = 10;
+ @ConfField
+ public static int load_pending_thread_num_normal_priority = 10;
/**
* Concurrency of HIGH priority etl load jobs.
* Do not change this if you know what you are doing.
*/
- @ConfField public static int load_etl_thread_num_high_priority = 3;
+ @ConfField
+ public static int load_etl_thread_num_high_priority = 3;
/**
* Concurrency of NORMAL priority etl load jobs.
* Do not change this if you know what you are doing.
*/
- @ConfField public static int load_etl_thread_num_normal_priority = 10;
+ @ConfField
+ public static int load_etl_thread_num_normal_priority = 10;
/**
* Concurrency of delete jobs.
*/
- @ConfField public static int delete_thread_num = 10;
+ @ConfField
+ public static int delete_thread_num = 10;
/**
* Not available.
*/
@@ -482,13 +534,13 @@ public class Config extends ConfigBase {
@Deprecated
@ConfField(mutable = true, masterOnly = true)
public static int mini_load_default_timeout_second = 3600; // 1 hour
-
+
/**
* Default insert load timeout
*/
@ConfField(mutable = true, masterOnly = true)
public static int insert_load_default_timeout_second = 3600; // 1 hour
-
+
/**
* Default stream load and streaming mini load timeout
*/
@@ -550,7 +602,7 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static int desired_max_waiting_jobs = 100;
-
+
/**
* maximun concurrent running txn num including prepare, commit txns under a single db
* txn manager will reject coming txns
@@ -574,7 +626,8 @@ public class Config extends ConfigBase {
/**
* Clone checker's running interval.
*/
- @ConfField public static int clone_checker_interval_second = 300;
+ @ConfField
+ public static int clone_checker_interval_second = 300;
/**
* Default timeout of a single clone job. Set long enough to fit your replica size.
* The larger the replica data size is, the more time is will cost to finish clone.
@@ -593,7 +646,7 @@ public class Config extends ConfigBase {
* If the priority is LOW, it will be delayed *clone_low_priority_delay_second*
* after the job creation and then be executed.
* This is to avoid a large number of clone jobs running at same time only because a host is down for a short time.
- *
+ *
* NOTICE that this config(and *clone_normal_priority_delay_second* as well)
* will not work if it's smaller then *clone_checker_interval_second*
*/
@@ -650,14 +703,16 @@ public class Config extends ConfigBase {
* When create a table(or partition), you can specify its storage medium(HDD or SSD).
* If not set, this specifies the default medium when creat.
*/
- @ConfField public static String default_storage_medium = "HDD";
+ @ConfField
+ public static String default_storage_medium = "HDD";
/**
* When create a table(or partition), you can specify its storage medium(HDD or SSD).
* If set to SSD, this specifies the default duration that tablets will stay on SSD.
* After that, tablets will be moved to HDD automatically.
* You can set storage cooldown time in CREATE TABLE stmt.
*/
- @ConfField public static long storage_cooldown_second = 30 * 24 * 3600L; // 30 days
+ @ConfField
+ public static long storage_cooldown_second = 30 * 24 * 3600L; // 30 days
/**
* After dropping database(table/partition), you can recover it by using RECOVER stmt.
* And this specifies the maximal data retention time. After time, the data will be deleted permanently.
@@ -680,7 +735,8 @@ public class Config extends ConfigBase {
/**
* Export checker's running interval.
*/
- @ConfField public static int export_checker_interval_second = 5;
+ @ConfField
+ public static int export_checker_interval_second = 5;
/**
* Limitation of the concurrency of running export jobs.
* Default is 5.
@@ -718,12 +774,14 @@ public class Config extends ConfigBase {
/**
* Maximal number of connections per FE.
*/
- @ConfField public static int qe_max_connection = 1024;
+ @ConfField
+ public static int qe_max_connection = 1024;
/**
* Maximal number of thread in connection-scheduler-pool.
*/
- @ConfField public static int max_connection_scheduler_threads_num = 4096;
+ @ConfField
+ public static int max_connection_scheduler_threads_num = 4096;
/**
* The memory_limit for colocote join PlanFragment instance =
@@ -740,9 +798,12 @@ public class Config extends ConfigBase {
/**
* The default user resource publishing timeout.
*/
- @ConfField public static int meta_publish_timeout_ms = 1000;
- @ConfField public static boolean proxy_auth_enable = false;
- @ConfField public static String proxy_auth_magic_prefix = "x@8";
+ @ConfField
+ public static int meta_publish_timeout_ms = 1000;
+ @ConfField
+ public static boolean proxy_auth_enable = false;
+ @ConfField
+ public static String proxy_auth_magic_prefix = "x@8";
/**
* Limit on the number of expr children of an expr tree.
* Exceed this limit may cause long analysis time while holding database read lock.
@@ -763,16 +824,21 @@ public class Config extends ConfigBase {
* Plugins' path for BACKUP and RESTORE operations. Currently deprecated.
*/
@Deprecated
- @ConfField public static String backup_plugin_path = "/tools/trans_file_tool/trans_files.sh";
+ @ConfField
+ public static String backup_plugin_path = "/tools/trans_file_tool/trans_files.sh";
// Configurations for hadoop dpp
/**
* The following configurations are not available.
*/
- @ConfField public static String dpp_hadoop_client_path = "/lib/hadoop-client/hadoop/bin/hadoop";
- @ConfField public static long dpp_bytes_per_reduce = 100 * 1024 * 1024L; // 100M
- @ConfField public static String dpp_default_cluster = "palo-dpp";
- @ConfField public static String dpp_default_config_str = ""
+ @ConfField
+ public static String dpp_hadoop_client_path = "/lib/hadoop-client/hadoop/bin/hadoop";
+ @ConfField
+ public static long dpp_bytes_per_reduce = 100 * 1024 * 1024L; // 100M
+ @ConfField
+ public static String dpp_default_cluster = "palo-dpp";
+ @ConfField
+ public static String dpp_default_config_str = ""
+ "{"
+ "hadoop_configs : '"
+ "mapred.job.priority=NORMAL;"
@@ -784,7 +850,8 @@ public class Config extends ConfigBase {
+ "dfs.client.authserver.force_stop=true;"
+ "dfs.client.auth.method=0"
+ "'}";
- @ConfField public static String dpp_config_str = ""
+ @ConfField
+ public static String dpp_config_str = ""
+ "{palo-dpp : {"
+ "hadoop_palo_path : '/dir',"
+ "hadoop_configs : '"
@@ -796,28 +863,32 @@ public class Config extends ConfigBase {
// For forward compatibility, will be removed later.
// check token when download image file.
- @ConfField public static boolean enable_token_check = true;
+ @ConfField
+ public static boolean enable_token_check = true;
/**
* Set to true if you deploy Palo using thirdparty deploy manager
* Valid options are:
- * disable: no deploy manager
- * k8s: Kubernetes
- * ambari: Ambari
- * local: Local File (for test or Boxer2 BCC version)
+ * disable: no deploy manager
+ * k8s: Kubernetes
+ * ambari: Ambari
+ * local: Local File (for test or Boxer2 BCC version)
*/
- @ConfField public static String enable_deploy_manager = "disable";
-
+ @ConfField
+ public static String enable_deploy_manager = "disable";
+
// If use k8s deploy manager locally, set this to true and prepare the certs files
- @ConfField public static boolean with_k8s_certs = false;
-
+ @ConfField
+ public static boolean with_k8s_certs = false;
+
// Set runtime locale when exec some cmds
- @ConfField public static String locale = "zh_CN.UTF-8";
+ @ConfField
+ public static String locale = "zh_CN.UTF-8";
// default timeout of backup job
@ConfField(mutable = true, masterOnly = true)
public static int backup_job_default_timeout_ms = 86400 * 1000; // 1 day
-
+
/**
* 'storage_high_watermark_usage_percent' limit the max capacity usage percent of a Backend storage path.
* 'storage_min_left_capacity_bytes' limit the minimum left capacity of a Backend storage path.
@@ -842,34 +913,38 @@ public class Config extends ConfigBase {
// update interval of tablet stat
// All frontends will get tablet stat from all backends at each interval
- @ConfField public static int tablet_stat_update_interval_second = 300; // 5 min
+ @ConfField
+ public static int tablet_stat_update_interval_second = 300; // 5 min
// May be necessary to modify the following BRPC configurations in high concurrency scenarios.
// The number of concurrent requests BRPC can processed
- @ConfField public static int brpc_number_of_concurrent_requests_processed = 4096;
+ @ConfField
+ public static int brpc_number_of_concurrent_requests_processed = 4096;
// BRPC idle wait time (ms)
- @ConfField public static int brpc_idle_wait_max_time = 10000;
-
+ @ConfField
+ public static int brpc_idle_wait_max_time = 10000;
+
/**
- * if set to false, auth check will be disable, in case some goes wrong with the new privilege system.
+ * if set to false, auth check will be disable, in case some goes wrong with the new privilege system.
*/
- @ConfField public static boolean enable_auth_check = true;
-
+ @ConfField
+ public static boolean enable_auth_check = true;
+
/**
* Max bytes a broker scanner can process in one broker load job.
* Commonly, each Backends has one broker scanner.
*/
@ConfField(mutable = true, masterOnly = true)
public static long max_bytes_per_broker_scanner = 3 * 1024 * 1024 * 1024L; // 3G
-
+
/**
* Max number of load jobs, include PENDING、ETL、LOADING、QUORUM_FINISHED.
* If exceed this number, load job is not allowed to be submitted.
*/
@ConfField(mutable = true, masterOnly = true)
public static long max_unfinished_load_job = 1000;
-
+
/**
* If set to true, Planner will try to select replica of tablet on same host as this Frontend.
* This may reduce network transmission in following case:
@@ -880,7 +955,7 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true)
public static boolean enable_local_replica_selection = false;
-
+
/**
* The timeout of executing async remote fragment.
* In normal case, the async remote fragment will be executed in a short time. If system are under high load
@@ -888,9 +963,9 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true)
public static long remote_fragment_exec_timeout_ms = 5000; // 5 sec
-
+
/**
- * The number of query retries.
+ * The number of query retries.
* A query may retry if we encounter RPC exception and no result has been sent to user.
* You may reduce this number to avoid Avalanche disaster.
*/
@@ -903,12 +978,12 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true)
public static long catalog_try_lock_timeout_ms = 5000; // 5 sec
-
+
/**
* if this is set to true
- * all pending load job will failed when call begin txn api
- * all prepare load job will failed when call commit txn api
- * all committed load job will waiting to be published
+ * all pending load job will failed when call begin txn api
+ * all prepare load job will failed when call commit txn api
+ * all committed load job will waiting to be published
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean disable_load_job = false;
@@ -918,20 +993,20 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static int db_used_data_quota_update_interval_secs = 300;
-
+
/**
* Load using hadoop cluster will be deprecated in future.
* Set to true to disable this kind of load.
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean disable_hadoop_load = false;
-
+
/**
* fe will call es api to get es index shard info every es_state_sync_interval_secs
*/
@ConfField
public static long es_state_sync_interval_second = 10;
-
+
/**
* the factor of delay time before deciding to repair tablet.
* if priority is VERY_HIGH, repair it immediately.
@@ -941,17 +1016,19 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static long tablet_repair_delay_factor_second = 60;
-
+
/**
* the default slot number per path in tablet scheduler
* TODO(cmy): remove this config and dynamically adjust it by clone task statistic
*/
- @ConfField public static int schedule_slot_num_per_path = 2;
-
+ @ConfField
+ public static int schedule_slot_num_per_path = 2;
+
/**
* Deprecated after 0.10
*/
- @ConfField public static boolean use_new_tablet_scheduler = true;
+ @ConfField
+ public static boolean use_new_tablet_scheduler = true;
/**
* the threshold of cluster balance score, if a backend's load score is 10% lower than average score,
@@ -990,11 +1067,12 @@ public class Config extends ConfigBase {
// 10000 replicas: 200ms
@ConfField(mutable = true, masterOnly = true)
public static int report_queue_size = 100;
-
+
/**
* If set to true, metric collector will be run as a daemon timer to collect metrics at fix interval
*/
- @ConfField public static boolean enable_metric_calculator = true;
+ @ConfField
+ public static boolean enable_metric_calculator = true;
/**
* the max routine load job num, including NEED_SCHEDULED, RUNNING, PAUSE
@@ -1018,13 +1096,13 @@ public class Config extends ConfigBase {
public static int max_routine_load_task_num_per_be = 5;
/**
- * The max number of files store in SmallFileMgr
+ * The max number of files store in SmallFileMgr
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_small_file_number = 100;
/**
- * The max size of a single file store in SmallFileMgr
+ * The max size of a single file store in SmallFileMgr
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_small_file_size_bytes = 1024 * 1024; // 1MB
@@ -1032,15 +1110,18 @@ public class Config extends ConfigBase {
/**
* Save small files
*/
- @ConfField public static String small_file_dir = PaloFe.DORIS_HOME_DIR + "/small_files";
-
+ @ConfField
+ public static String small_file_dir = PaloFe.DORIS_HOME_DIR + "/small_files";
+
/**
* The following 2 configs can set to true to disable the automatic colocate tables's relocate and balance.
* if 'disable_colocate_relocate' is set to true, ColocateTableBalancer will not relocate colocate tables when Backend unavailable.
* if 'disable_colocate_balance' is set to true, ColocateTableBalancer will not balance colocate tables.
*/
- @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_relocate = false;
- @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_balance = false;
+ @ConfField(mutable = true, masterOnly = true)
+ public static boolean disable_colocate_relocate = false;
+ @ConfField(mutable = true, masterOnly = true)
+ public static boolean disable_colocate_balance = false;
/**
* If set to true, the insert stmt with processing error will still return a label to user.
@@ -1048,14 +1129,15 @@ public class Config extends ConfigBase {
* The default value is false, which means if insert operation encounter errors,
* exception will be thrown to user client directly without load label.
*/
- @ConfField(mutable = true, masterOnly = true) public static boolean using_old_load_usage_pattern = false;
+ @ConfField(mutable = true, masterOnly = true)
+ public static boolean using_old_load_usage_pattern = false;
/**
* This will limit the max recursion depth of hash distribution pruner.
* eg: where a in (5 elements) and b in (4 elements) and c in (3 elements) and d in (2 elements).
* a/b/c/d are distribution columns, so the recursion depth will be 5 * 4 * 3 * 2 = 120, larger than 100,
* So that distribution pruner will no work and just return all buckets.
- *
+ *
* Increase the depth can support distribution pruning for more elements, but may cost more CPU.
*/
@ConfField(mutable = true, masterOnly = false)
@@ -1077,10 +1159,10 @@ public class Config extends ConfigBase {
/**
* The multi cluster feature will be deprecated in version 0.12
* set this config to true will disable all operations related to cluster feature, include:
- * create/drop cluster
- * add free backend/add backend to cluster/decommission cluster balance
- * change the backends num of cluster
- * link/migration db
+ * create/drop cluster
+ * add free backend/add backend to cluster/decommission cluster balance
+ * change the backends num of cluster
+ * link/migration db
*/
@ConfField(mutable = true)
public static boolean disable_cluster_feature = true;
@@ -1156,7 +1238,7 @@ public class Config extends ConfigBase {
* This config will decide whether to resend agent task when create_time for agent_task is set,
* only when current_time - create_time > agent_task_resend_wait_time_ms can ReportHandler do resend agent task
*/
- @ConfField (mutable = true, masterOnly = true)
+ @ConfField(mutable = true, masterOnly = true)
public static long agent_task_resend_wait_time_ms = 5000;
/**
@@ -1172,10 +1254,10 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static long max_clone_task_timeout_sec = 2 * 60 * 60; // 2h
- /**
+ /**
* If set to true, fe will enable sql result cache
* This option is suitable for offline data update scenarios
- * case1 case2 case3 case4
+ * case1 case2 case3 case4
* enable_sql_cache false true true false
* enable_partition_cache false false true true
*/
@@ -1190,8 +1272,8 @@ public class Config extends ConfigBase {
public static boolean cache_enable_partition_mode = true;
/**
- * Minimum interval between last version when caching results,
- * This parameter distinguishes between offline and real-time updates
+ * Minimum interval between last version when caching results,
+ * This parameter distinguishes between offline and real-time updates
*/
@ConfField(mutable = true, masterOnly = false)
public static int cache_last_version_interval_second = 900;
@@ -1201,7 +1283,7 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = false)
public static int cache_result_max_row_count = 3000;
-
+
/**
* Used to limit element num of InPredicate in delete statement.
*/
@@ -1218,4 +1300,30 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean recover_with_empty_tablet = false;
-}
+
+ /**
+ * enable_result_cache_ttl
+ * Whether or not the result cache ttl is enabled in Fe level, it can be overwritten with connection/session
+ * level setting in Context.
+ */
+ @ConfField(mutable = true)
+ public static boolean enable_result_cache_ttl = false;
+
+ /**
+ * Specify how long an entry will be expired in milliseconds, 10000 by default.
+ */
+ @ConfField(mutable = true)
+ public static long result_cache_ttl_expire_after_in_milliseconds = 10 * 1000;
+
+ /**
+ * Specify the overall threshold of local cache in bytes, 1G bytes by default.
+ */
+ @ConfField(mutable = true)
+ public static long result_cache_ttl_size_in_bytes = 1024 * 1024 * 1024;
+
+ /**
+ * Max Result Size per query
+ */
+ @ConfField(mutable = true)
+ public static long result_cache_ttl_size_per_query_in_bytes = 1024 * 1024;
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
index 1705bb632ccd88..fa9be0e2ee83e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
@@ -37,13 +37,13 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
/*
- * if you want to visit the atrribute(such as queryID,defaultDb)
+ * if you want to visit the atrribute(such as queryID,defaultDb)
* you can use profile.getInfoStrings("queryId")
* All attributes can be seen from the above.
- *
+ *
* why the element in the finished profile arary is not RuntimeProfile,
- * the purpose is let coordinator can destruct earlier(the fragment profile is in Coordinator)
- *
+ * the purpose is let coordinator can destruct earlier(the fragment profile is in Coordinator)
+ *
*/
public class ProfileManager {
private static final Logger LOG = LogManager.getLogger(ProfileManager.class);
@@ -59,39 +59,40 @@ public class ProfileManager {
public static final String SQL_STATEMENT = "Sql Statement";
public static final String USER = "User";
public static final String DEFAULT_DB = "Default Db";
-
+ public static final String IS_CACHED = "IS Cached";
+
public static final ArrayList PROFILE_HEADERS = new ArrayList(
Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE,
- START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE));
-
+ START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE, IS_CACHED));
+
private class ProfileElement {
- public Map infoStrings = Maps.newHashMap();
+ public Map infoStrings = Maps.newHashMap();
public String profileContent;
}
-
+
// only protect profileDeque; profileMap is concurrent, no need to protect
- private ReentrantReadWriteLock lock;
+ private ReentrantReadWriteLock lock;
private ReadLock readLock;
private WriteLock writeLock;
private Deque profileDeque;
private Map profileMap; // from QueryId to RuntimeProfile
-
+
public static ProfileManager getInstance() {
if (INSTANCE == null) {
INSTANCE = new ProfileManager();
}
return INSTANCE;
}
-
+
private ProfileManager() {
- lock = new ReentrantReadWriteLock(true);
+ lock = new ReentrantReadWriteLock(true);
readLock = lock.readLock();
writeLock = lock.writeLock();
profileDeque = new LinkedList();
profileMap = new ConcurrentHashMap();
}
-
+
public ProfileElement createElement(RuntimeProfile profile) {
ProfileElement element = new ProfileElement();
RuntimeProfile summaryProfile = profile.getChildList().get(0).first;
@@ -101,12 +102,12 @@ public ProfileElement createElement(RuntimeProfile profile) {
element.profileContent = profile.toString();
return element;
}
-
+
public void pushProfile(RuntimeProfile profile) {
if (profile == null) {
return;
}
-
+
ProfileElement element = createElement(profile);
String queryId = element.infoStrings.get(ProfileManager.QUERY_ID);
// check when push in, which can ensure every element in the list has QUERY_ID column,
@@ -115,10 +116,10 @@ public void pushProfile(RuntimeProfile profile) {
LOG.warn("the key or value of Map is null, "
+ "may be forget to insert 'QUERY_ID' column into infoStrings");
}
-
+
profileMap.put(queryId, element);
writeLock.lock();
- try {
+ try {
if (profileDeque.size() >= ARRAY_SIZE) {
profileMap.remove(profileDeque.getFirst().infoStrings.get(QUERY_ID));
profileDeque.removeFirst();
@@ -128,7 +129,7 @@ public void pushProfile(RuntimeProfile profile) {
writeLock.unlock();
}
}
-
+
public List> getAllQueries() {
List> result = Lists.newArrayList();
readLock.lock();
@@ -137,9 +138,9 @@ public List> getAllQueries() {
while (reverse.hasNext()) {
ProfileElement element = (ProfileElement) reverse.next();
Map infoStrings = element.infoStrings;
-
+
List row = Lists.newArrayList();
- for (String str : PROFILE_HEADERS ) {
+ for (String str : PROFILE_HEADERS) {
row.add(infoStrings.get(str));
}
result.add(row);
@@ -149,7 +150,7 @@ public List> getAllQueries() {
}
return result;
}
-
+
public String getProfile(String queryID) {
readLock.lock();
try {
@@ -157,7 +158,7 @@ public String getProfile(String queryID) {
if (element == null) {
return null;
}
-
+
return element.profileContent;
} finally {
readLock.unlock();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/StringUtils.java
similarity index 63%
rename from fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java
rename to fe/fe-core/src/main/java/org/apache/doris/common/util/StringUtils.java
index a66bc4f02d85aa..c382457f91b735 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/StringUtils.java
@@ -15,22 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.metric;
+package org.apache.doris.common.util;
-public class GaugeMetricImpl extends GaugeMetric {
+import com.google.common.base.Preconditions;
- public GaugeMetricImpl(String name, MetricUnit unit, String description) {
- super(name, unit, description);
- }
-
- private T value;
-
- public void setValue(T v) {
- this.value = v;
- }
+import java.nio.charset.StandardCharsets;
- @Override
- public T getValue() {
- return value;
+/**
+ * Common String utilities for Doris
+ */
+public class StringUtils {
+ /**
+ * Get UTF 8 bytes from input string
+ * @param str
+ * @return
+ */
+ public static byte[] toUtf8(String str){
+ Preconditions.checkNotNull(str, "Input String for UTF8 conversion is null!");
+ return str.getBytes(StandardCharsets.UTF_8);
}
-}
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetric.java b/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetric.java
index 2e8d8191d20c00..80480b2abc7341 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetric.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetric.java
@@ -20,9 +20,27 @@
/*
* Gauge metric is updated every time it is visited
*/
-public abstract class GaugeMetric extends Metric {
-
+public class GaugeMetric extends Metric {
+ /**
+ * Construct an instance with specified name and description
+ *
+ * @param name
+ * @param description
+ * @return
+ */
public GaugeMetric(String name, MetricUnit unit, String description) {
super(name, MetricType.GAUGE, unit, description);
}
+
+ private T value;
+
+ public void setValue(T v) {
+ this.value = v;
+ }
+
+ @Override
+ public T getValue() {
+ return value;
+ }
}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java
index 05aa38e79f7c3e..84ac07af5ffff0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java
@@ -17,6 +17,9 @@
package org.apache.doris.metric;
+import org.apache.doris.cache.CacheFactory;
+import org.apache.doris.cache.CacheStats;
+
import java.util.List;
import java.util.TimerTask;
@@ -29,6 +32,7 @@ public class MetricCalculator extends TimerTask {
private long lastQueryCounter = -1;
private long lastRequestCounter = -1;
private long lastQueryErrCounter = -1;
+ private CacheStats priorStats = null;
@Override
public void run() {
@@ -65,6 +69,28 @@ private void update() {
MetricRepo.GAUGE_QUERY_ERR_RATE.setValue(errRate < 0 ? 0.0 : errRate);
lastQueryErrCounter = currentErrCounter;
+ // Cache stats
+ if (priorStats == null) {
+ priorStats = CacheFactory.getUniversalCache().getStats();
+ MetricRepo.COUNTER_RESULT_CACHE_ERRORS.increase(priorStats.getNumErrors());
+ MetricRepo.COUNTER_RESULT_CACHE_TIMEOUTS.increase(priorStats.getNumTimeouts());
+ MetricRepo.COUNTER_RESULT_CACHE_EVICTIONS.increase(priorStats.getNumEvictions());
+ MetricRepo.GAUGE_RESULT_CACHE_SIZE_IN_BYTES.setValue(priorStats.getSizeInBytes());
+ MetricRepo.GAUGE_RESULT_CACHE_ENTRIES.setValue(priorStats.getNumEntries());
+ MetricRepo.COUNTER_RESULT_CACHE_MISSES.increase(priorStats.getNumMisses());
+ MetricRepo.COUNTER_RESULT_CACHE_HITS.increase(priorStats.getNumHits());
+ } else {
+ CacheStats currentStats = CacheFactory.getUniversalCache().getStats();
+ MetricRepo.COUNTER_RESULT_CACHE_ERRORS.increase(deltaCounter(priorStats.getNumErrors(), currentStats.getNumErrors()));
+ MetricRepo.COUNTER_RESULT_CACHE_TIMEOUTS.increase(deltaCounter(priorStats.getNumTimeouts(), currentStats.getNumTimeouts()));
+ MetricRepo.COUNTER_RESULT_CACHE_EVICTIONS.increase(deltaCounter(priorStats.getNumEvictions(), currentStats.getNumEvictions()));
+ MetricRepo.GAUGE_RESULT_CACHE_SIZE_IN_BYTES.setValue(currentStats.getSizeInBytes());
+ MetricRepo.GAUGE_RESULT_CACHE_ENTRIES.setValue(currentStats.getNumEntries());
+ MetricRepo.COUNTER_RESULT_CACHE_MISSES.increase(deltaCounter(priorStats.getNumMisses(), currentStats.getNumMisses()));
+ MetricRepo.COUNTER_RESULT_CACHE_HITS.increase(deltaCounter(priorStats.getNumHits(), currentStats.getNumHits()));
+ priorStats = currentStats;
+ }
+
lastTs = currentTs;
// max tabet compaction score of all backends
@@ -77,4 +103,15 @@ private void update() {
}
MetricRepo.GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(maxCompactionScore);
}
+
+ /**
+ * compute delta value for counter metrics
+ *
+ * @param prior
+ * @param curent
+ * @return
+ */
+ private static long deltaCounter(long prior, long curent) {
+ return Math.max(0L, curent - prior);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index e4e2b99f23a8a1..81e94cd0229402 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -51,7 +51,7 @@ public final class MetricRepo {
private static final MetricRegistry METRIC_REGISTER = new MetricRegistry();
private static final DorisMetricRegistry PALO_METRIC_REGISTER = new DorisMetricRegistry();
-
+
public static volatile boolean isInit = false;
public static final SystemMetrics SYSTEM_METRICS = new SystemMetrics();
@@ -84,14 +84,23 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS;
+ // Metrics for the result cache
+ public static LongCounterMetric COUNTER_RESULT_CACHE_TTL_HITS;
+ public static LongCounterMetric COUNTER_RESULT_CACHE_TTL_MISSES;
+ public static GaugeMetric GAUGE_RESULT_CACHE_TTL_ENTRIES;
+ public static GaugeMetric GAUGE_RESULT_CACHE_TTL_SIZE_IN_BYTES;
+ public static LongCounterMetric COUNTER_RESULT_CACHE_TTL_EVICTIONS;
+ public static LongCounterMetric COUNTER_RESULT_CACHE_TTL_TIMEOUTS;
+ public static LongCounterMetric COUNTER_RESULT_CACHE_TTL_ERRORS;
+
public static Histogram HISTO_QUERY_LATENCY;
public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
// following metrics will be updated by metric calculator
- public static GaugeMetricImpl GAUGE_QUERY_PER_SECOND;
- public static GaugeMetricImpl GAUGE_REQUEST_PER_SECOND;
- public static GaugeMetricImpl GAUGE_QUERY_ERR_RATE;
- public static GaugeMetricImpl GAUGE_MAX_TABLET_COMPACTION_SCORE;
+ public static GaugeMetric GAUGE_QUERY_PER_SECOND;
+ public static GaugeMetric GAUGE_REQUEST_PER_SECOND;
+ public static GaugeMetric GAUGE_QUERY_ERR_RATE;
+ public static GaugeMetric GAUGE_MAX_TABLET_COMPACTION_SCORE;
private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Metric-Timer-Pool", true);
private static MetricCalculator metricCalculator = new MetricCalculator();
@@ -117,8 +126,8 @@ public Long getValue() {
}
};
gauge.addLabel(new MetricLabel("job", "load"))
- .addLabel(new MetricLabel("type", jobType.name()))
- .addLabel(new MetricLabel("state", state.name()));
+ .addLabel(new MetricLabel("type", jobType.name()))
+ .addLabel(new MetricLabel("state", state.name()));
PALO_METRIC_REGISTER.addPaloMetrics(gauge);
}
}
@@ -129,7 +138,7 @@ public Long getValue() {
if (jobType != JobType.SCHEMA_CHANGE && jobType != JobType.ROLLUP) {
continue;
}
-
+
GaugeMetric gauge = (GaugeMetric) new GaugeMetric("job",
MetricUnit.NOUNIT, "job statistics") {
@Override
@@ -145,8 +154,8 @@ public Long getValue() {
}
};
gauge.addLabel(new MetricLabel("job", "alter"))
- .addLabel(new MetricLabel("type", jobType.name()))
- .addLabel(new MetricLabel("state", "running"));
+ .addLabel(new MetricLabel("type", jobType.name()))
+ .addLabel(new MetricLabel("state", "running"));
PALO_METRIC_REGISTER.addPaloMetrics(gauge);
}
@@ -192,16 +201,16 @@ public Long getValue() {
// qps, rps and error rate
// these metrics should be set an init value, in case that metric calculator is not running
- GAUGE_QUERY_PER_SECOND = new GaugeMetricImpl<>("qps", MetricUnit.NOUNIT, "query per second");
+ GAUGE_QUERY_PER_SECOND = new GaugeMetric<>("qps", MetricUnit.NOUNIT, "query per second");
GAUGE_QUERY_PER_SECOND.setValue(0.0);
PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_QUERY_PER_SECOND);
- GAUGE_REQUEST_PER_SECOND = new GaugeMetricImpl<>("rps", MetricUnit.NOUNIT, "request per second");
+ GAUGE_REQUEST_PER_SECOND = new GaugeMetric<>("rps", MetricUnit.NOUNIT, "request per second");
GAUGE_REQUEST_PER_SECOND.setValue(0.0);
PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_REQUEST_PER_SECOND);
- GAUGE_QUERY_ERR_RATE = new GaugeMetricImpl<>("query_err_rate", MetricUnit.NOUNIT, "query error rate");
+ GAUGE_QUERY_ERR_RATE = new GaugeMetric<>("query_err_rate", MetricUnit.NOUNIT, "query error rate");
PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_QUERY_ERR_RATE);
GAUGE_QUERY_ERR_RATE.setValue(0.0);
- GAUGE_MAX_TABLET_COMPACTION_SCORE = new GaugeMetricImpl<>("max_tablet_compaction_score",
+ GAUGE_MAX_TABLET_COMPACTION_SCORE = new GaugeMetric<>("max_tablet_compaction_score",
MetricUnit.NOUNIT, "max tablet compaction score of all backends");
PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_MAX_TABLET_COMPACTION_SCORE);
GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(0L);
@@ -224,17 +233,17 @@ public Long getValue() {
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_MODE_SQL);
COUNTER_CACHE_HIT_SQL = new LongCounterMetric("cache_hit_sql", MetricUnit.REQUESTS, "total hits query by sql model");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_HIT_SQL);
- COUNTER_CACHE_MODE_PARTITION = new LongCounterMetric("query_mode_partition", MetricUnit.REQUESTS,
- "total query of partition mode");
+ COUNTER_CACHE_MODE_PARTITION = new LongCounterMetric("query_mode_partition", MetricUnit.REQUESTS,
+ "total query of partition mode");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_MODE_PARTITION);
- COUNTER_CACHE_HIT_PARTITION = new LongCounterMetric("cache_hit_partition", MetricUnit.REQUESTS,
- "total hits query by partition model");
+ COUNTER_CACHE_HIT_PARTITION = new LongCounterMetric("cache_hit_partition", MetricUnit.REQUESTS,
+ "total hits query by partition model");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_HIT_PARTITION);
- COUNTER_CACHE_PARTITION_ALL = new LongCounterMetric("partition_all", MetricUnit.REQUESTS,
- "scan partition of cache partition model");
+ COUNTER_CACHE_PARTITION_ALL = new LongCounterMetric("partition_all", MetricUnit.REQUESTS,
+ "scan partition of cache partition model");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_PARTITION_ALL);
- COUNTER_CACHE_PARTITION_HIT = new LongCounterMetric("partition_hit", MetricUnit.REQUESTS,
- "hit partition of cache partition model");
+ COUNTER_CACHE_PARTITION_HIT = new LongCounterMetric("partition_hit", MetricUnit.REQUESTS,
+ "hit partition of cache partition model");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_PARTITION_HIT);
COUNTER_LOAD_FINISHED = new LongCounterMetric("load_finished", MetricUnit.REQUESTS, "total load finished");
@@ -269,6 +278,21 @@ public Long getValue() {
"total error rows of routine load");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS);
+ COUNTER_RESULT_CACHE_TTL_HITS = new LongCounterMetric("result_cache_ttl_hits", MetricUnit.NOUNIT,"Accumulated number of cache hits");
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TTL_HITS);
+ COUNTER_RESULT_CACHE_TTL_MISSES = new LongCounterMetric("result_cache_ttl_misses", MetricUnit.NOUNIT,"Accumulated number of cache misses");
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TTL_MISSES);
+ GAUGE_RESULT_CACHE_TTL_ENTRIES = new GaugeMetric("result_cache_ttl_entries", MetricUnit.NOUNIT,"Accumulated number of cache size by entries");
+ PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_RESULT_CACHE_TTL_ENTRIES);
+ GAUGE_RESULT_CACHE_TTL_SIZE_IN_BYTES = new GaugeMetric("result_cache_ttl_size_in_bytes", MetricUnit.BYTES,"Accumulated number of cache size by bytes");
+ PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_RESULT_CACHE_TTL_SIZE_IN_BYTES);
+ COUNTER_RESULT_CACHE_TTL_EVICTIONS = new LongCounterMetric("result_cache_ttl_evictions", MetricUnit.NOUNIT,"Accumulated number of cache evictions");
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TTL_EVICTIONS);
+ COUNTER_RESULT_CACHE_TTL_TIMEOUTS = new LongCounterMetric("result_cache_ttl_timeouts", MetricUnit.NOUNIT,"Accumulated number of cache timeouts");
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TTL_TIMEOUTS);
+ COUNTER_RESULT_CACHE_TTL_ERRORS= new LongCounterMetric("result_cache_ttl_errors", MetricUnit.NOUNIT,"Accumulated number of cache errors");
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TTL_ERRORS);
+
// 3. histogram
HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms"));
HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("editlog", "write", "latency", "ms"));
@@ -404,7 +428,7 @@ public static synchronized String getMetric(MetricVisitor visitor) {
for (Map.Entry entry : histograms.entrySet()) {
visitor.visitHistogram(sb, entry.getKey(), entry.getValue());
}
-
+
// node info
visitor.getNodeInfo(sb);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
index fa7ba25d394518..afb73e1cc6cb80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
@@ -248,6 +248,7 @@ private void writeBuffer(ByteBuffer buffer) throws IOException {
public void sendOnePacket(ByteBuffer packet) throws IOException {
int bufLen;
int oldLimit = packet.limit();
+ packet.mark();
while (oldLimit - packet.position() >= MAX_PHYSICAL_PACKET_LENGTH) {
bufLen = MAX_PHYSICAL_PACKET_LENGTH;
packet.limit(packet.position() + bufLen);
@@ -259,6 +260,8 @@ public void sendOnePacket(ByteBuffer packet) throws IOException {
packet.limit(oldLimit);
writeBuffer(packet);
accSequenceId();
+ //Restore to state before read,as we may cache it for future use.
+ packet.reset();
}
public void sendAndFlush(ByteBuffer packet) throws IOException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropDbInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropDbInfo.java
index 1fb335cd379fe8..f6dce08765dc33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropDbInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropDbInfo.java
@@ -51,6 +51,7 @@ public boolean isForceDrop() {
return forceDrop;
}
+ @Deprecated
private void readFields(DataInput in) throws IOException {
dbName = Text.readString(in);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java
index f10b8d274897d3..91669cc8f55139 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java
@@ -72,6 +72,7 @@ public boolean isForceDrop() {
return forceDrop;
}
+ @Deprecated
private void readFields(DataInput in) throws IOException {
dbId = in.readLong();
tableId = in.readLong();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
index 303212353e2e98..f75ad132816fd2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java
@@ -87,6 +87,7 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, json);
}
+ @Deprecated
private void readFields(DataInput in) throws IOException {
dbId = in.readLong();
tblId = in.readLong();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java b/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java
index b6dfce96ecead0..19b6a7c3c36635 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java
@@ -20,9 +20,12 @@
import org.apache.doris.proto.PQueryStatistics;
import org.apache.doris.thrift.TResultBatch;
-public final class RowBatch {
+import java.io.Serializable;
+
+public final class RowBatch implements Serializable {
private TResultBatch batch;
- private PQueryStatistics statistics;
+ //transient for cache
+ private transient PQueryStatistics statistics;
private boolean eos;
public RowBatch() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d9e72413b668cc..5edcf48d1d3f60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -37,8 +37,8 @@
// System variable
public class SessionVariable implements Serializable, Writable {
-
- static final Logger LOG = LogManager.getLogger(StmtExecutor.class);
+
+ static final Logger LOG = LogManager.getLogger(SessionVariable.class);
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String QUERY_TIMEOUT = "query_timeout";
public static final String IS_REPORT_SUCCESS = "is_report_success";
@@ -67,7 +67,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String NET_BUFFER_LENGTH = "net_buffer_length";
public static final String CODEGEN_LEVEL = "codegen_level";
// mem limit can't smaller than bufferpool's default page size
- public static final int MIN_EXEC_MEM_LIMIT = 2097152;
+ public static final int MIN_EXEC_MEM_LIMIT = 2097152;
public static final String BATCH_SIZE = "batch_size";
public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations";
public static final String DISABLE_COLOCATE_JOIN = "disable_colocate_join";
@@ -75,9 +75,10 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String ENABLE_SPILLING = "enable_spilling";
public static final String PREFER_JOIN_METHOD = "prefer_join_method";
-
+
public static final String ENABLE_SQL_CACHE = "enable_sql_cache";
public static final String ENABLE_PARTITION_CACHE = "enable_partition_cache";
+ public static final String ENABLE_RESULT_CACHE_TTL = "enable_result_cache_ttl";
public static final int MIN_EXEC_INSTANCE_NUM = 1;
public static final int MAX_EXEC_INSTANCE_NUM = 32;
@@ -86,7 +87,7 @@ public class SessionVariable implements Serializable, Writable {
// user can set instance num after exchange, no need to be equal to nums of before exchange
public static final String PARALLEL_EXCHANGE_INSTANCE_NUM = "parallel_exchange_instance_num";
/*
- * configure the mem limit of load process on BE.
+ * configure the mem limit of load process on BE.
* Previously users used exec_mem_limit to set memory limits.
* To maintain compatibility, the default value of load_mem_limit is 0,
* which means that the load memory limit is still using exec_mem_limit.
@@ -233,6 +234,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_PARTITION_CACHE)
private boolean enablePartitionCache = false;
+ @VariableMgr.VarAttr(name = ENABLE_RESULT_CACHE_TTL)
+ private boolean enableResultCache = false;
+
@VariableMgr.VarAttr(name = FORWARD_TO_MASTER)
private boolean forwardToMaster = false;
@@ -411,9 +415,13 @@ public boolean isDisableColocateJoin() {
return disableColocateJoin;
}
- public String getPreferJoinMethod() {return preferJoinMethod; }
+ public String getPreferJoinMethod() {
+ return preferJoinMethod;
+ }
- public void setPreferJoinMethod(String preferJoinMethod) {this.preferJoinMethod = preferJoinMethod; }
+ public void setPreferJoinMethod(String preferJoinMethod) {
+ this.preferJoinMethod = preferJoinMethod;
+ }
public int getParallelExecInstanceNum() {
return parallelExecInstanceNum;
@@ -423,7 +431,9 @@ public int getExchangeInstanceParallel() {
return exchangeInstanceParallel;
}
- public boolean getEnableInsertStrict() { return enableInsertStrict; }
+ public boolean getEnableInsertStrict() {
+ return enableInsertStrict;
+ }
public void setEnableInsertStrict(boolean enableInsertStrict) {
this.enableInsertStrict = enableInsertStrict;
@@ -444,13 +454,33 @@ public boolean isEnablePartitionCache() {
public void setEnablePartitionCache(boolean enablePartitionCache) {
this.enablePartitionCache = enablePartitionCache;
}
-
+
+ /**
+ * Check if the result cache is enabled for this session. True by default.
+ *
+ * @return True for cached-enabled, otherwise false.
+ */
+ public boolean isEnableResultCache() {
+ return enableResultCache;
+ }
+
+ /**
+ * Turn on/off result cache for this session.
+ *
+ * @param resultCacheEnabledInSession
+ */
+ public void setEnableResultCache(boolean resultCacheEnabledInSession) {
+ this.enableResultCache = resultCacheEnabledInSession;
+ }
+
// Serialize to thrift object
public boolean getForwardToMaster() {
return forwardToMaster;
}
- public boolean isUseV2Rollup() { return useV2Rollup; }
+ public boolean isUseV2Rollup() {
+ return useV2Rollup;
+ }
// for unit test
public void setUseV2Rollup(boolean useV2Rollup) {
@@ -578,51 +608,57 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, root.toString());
}
+ @Deprecated
+ private void readFromStream(DataInput in) throws IOException {
+ codegenLevel = in.readInt();
+ netBufferLength = in.readInt();
+ sqlSafeUpdates = in.readInt();
+ timeZone = Text.readString(in);
+ netReadTimeout = in.readInt();
+ netWriteTimeout = in.readInt();
+ waitTimeout = in.readInt();
+ interactiveTimeout = in.readInt();
+ queryCacheType = in.readInt();
+ autoIncrementIncrement = in.readInt();
+ maxAllowedPacket = in.readInt();
+ sqlSelectLimit = in.readLong();
+ sqlAutoIsNull = in.readBoolean();
+ collationDatabase = Text.readString(in);
+ collationConnection = Text.readString(in);
+ charsetServer = Text.readString(in);
+ charsetResults = Text.readString(in);
+ charsetConnection = Text.readString(in);
+ charsetClient = Text.readString(in);
+ txIsolation = Text.readString(in);
+ autoCommit = in.readBoolean();
+ resourceGroup = Text.readString(in);
+ if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_65) {
+ sqlMode = in.readLong();
+ } else {
+ // read old version SQL mode
+ Text.readString(in);
+ sqlMode = 0L;
+ }
+ isReportSucc = in.readBoolean();
+ queryTimeoutS = in.readInt();
+ maxExecMemByte = in.readLong();
+ if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_37) {
+ collationServer = Text.readString(in);
+ }
+ if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_38) {
+ batchSize = in.readInt();
+ disableStreamPreaggregations = in.readBoolean();
+ parallelExecInstanceNum = in.readInt();
+ }
+ if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_62) {
+ exchangeInstanceParallel = in.readInt();
+ }
+ enableResultCache = in.readBoolean();
+ }
+
public void readFields(DataInput in) throws IOException {
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_67) {
- codegenLevel = in.readInt();
- netBufferLength = in.readInt();
- sqlSafeUpdates = in.readInt();
- timeZone = Text.readString(in);
- netReadTimeout = in.readInt();
- netWriteTimeout = in.readInt();
- waitTimeout = in.readInt();
- interactiveTimeout = in.readInt();
- queryCacheType = in.readInt();
- autoIncrementIncrement = in.readInt();
- maxAllowedPacket = in.readInt();
- sqlSelectLimit = in.readLong();
- sqlAutoIsNull = in.readBoolean();
- collationDatabase = Text.readString(in);
- collationConnection = Text.readString(in);
- charsetServer = Text.readString(in);
- charsetResults = Text.readString(in);
- charsetConnection = Text.readString(in);
- charsetClient = Text.readString(in);
- txIsolation = Text.readString(in);
- autoCommit = in.readBoolean();
- resourceGroup = Text.readString(in);
- if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_65) {
- sqlMode = in.readLong();
- } else {
- // read old version SQL mode
- Text.readString(in);
- sqlMode = 0L;
- }
- isReportSucc = in.readBoolean();
- queryTimeoutS = in.readInt();
- maxExecMemByte = in.readLong();
- if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_37) {
- collationServer = Text.readString(in);
- }
- if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_38) {
- batchSize = in.readInt();
- disableStreamPreaggregations = in.readBoolean();
- parallelExecInstanceNum = in.readInt();
- }
- if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_62) {
- exchangeInstanceParallel = in.readInt();
- }
+ readFromStream(in);
} else {
readFromJson(in);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 14a811a3417ed0..f1ae3423a67eb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -36,6 +36,8 @@
import org.apache.doris.analysis.StmtRewriter;
import org.apache.doris.analysis.UnsupportedStmt;
import org.apache.doris.analysis.UseStmt;
+import org.apache.doris.cache.Cache;
+import org.apache.doris.cache.CacheFactory;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@@ -55,6 +57,7 @@
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.common.util.StringUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.metric.MetricRepo;
@@ -81,17 +84,22 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.commons.lang.SerializationUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.StringReader;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.doris.cache.Cache.NamedKey;
+
// Do one COM_QEURY process.
// first: Parse receive byte array to statement struct.
// second: Do handle function for statement.
@@ -114,6 +122,7 @@ public class StmtExecutor {
private boolean isProxy;
private ShowResultSet proxyResultSet = null;
private PQueryStatistics statisticsForAuditLog;
+ private boolean isCached;
// this constructor is mainly for proxy
public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) {
@@ -155,6 +164,8 @@ public void initProfile(long beginTimeInNanoSecond) {
summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser());
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase());
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt.originStmt);
+ // Add additional information to query profile summary
+ summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ? "Yes" : "No");
profile.addChild(summaryProfile);
if (coord != null) {
coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond));
@@ -243,15 +254,14 @@ public void execute() throws Exception {
if (parsedStmt instanceof QueryStmt) {
context.getState().setIsQuery(true);
int retryTime = Config.max_query_retry_time;
- for (int i = 0; i < retryTime; i ++) {
+ for (int i = 0; i < retryTime; i++) {
try {
//reset query id for each retry
if (i > 0) {
uuid = UUID.randomUUID();
context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()));
}
- handleQueryStmt();
- if (context.getSessionVariable().isReportSucc()) {
+ if (!handleQueryStmt() && context.getSessionVariable().isReportSucc()) {
writeProfile(beginTimeInNanoSecond);
}
break;
@@ -394,7 +404,7 @@ public void analyze(TQueryOptions tQueryOptions) throws UserException {
LOG.info("analysis exception happened when parsing stmt {}, id: {}, error: {}",
originStmt, context.getStmtId(), syntaxError, e);
if (syntaxError == null) {
- throw e;
+ throw e;
} else {
throw new AnalysisException(syntaxError, e);
}
@@ -413,7 +423,7 @@ public void analyze(TQueryOptions tQueryOptions) throws UserException {
if (isForwardToMaster()) {
return;
}
-
+
analyzer = new Analyzer(context.getCatalog(), context);
// Convert show statement to select statement here
if (parsedStmt instanceof ShowStmt) {
@@ -493,7 +503,7 @@ private void analyzeAndGenerateQueryPlan(TQueryOptions tQueryOptions) throws Use
// types and column labels to restore them after the rewritten stmt has been
// reset() and re-analyzed.
List origResultTypes = Lists.newArrayList();
- for (Expr e: parsedStmt.getResultExprs()) {
+ for (Expr e : parsedStmt.getResultExprs()) {
origResultTypes.add(e.getType());
}
List origColLabels =
@@ -558,7 +568,7 @@ private void handleKill() throws DdlException {
// Only user itself and user with admin priv can kill connection
if (!killCtx.getQualifiedUser().equals(ConnectContext.get().getQualifiedUser())
&& !Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(),
- PrivPredicate.ADMIN)) {
+ PrivPredicate.ADMIN)) {
ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, id);
}
@@ -581,32 +591,59 @@ private void handleSetStmt() {
context.getState().setOk();
}
- // Process a select statement.
- private void handleQueryStmt() throws Exception {
+ /**
+ * Process a select statement.
+ * if return true ,hit cache
+ */
+ private boolean handleQueryStmt() throws Exception {
// Every time set no send flag and clean all data in buffer
context.getMysqlChannel().reset();
QueryStmt queryStmt = (QueryStmt) parsedStmt;
+ // Use connection ID as session identifier
+ NamedKey namedKey = new NamedKey(String.valueOf(context.getConnectionId()),
+ StringUtils.toUtf8(originStmt.originStmt));
+ LOG.debug("Result Cache NamedKey [{}]", namedKey);
+
QueryDetail queryDetail = new QueryDetail(context.getStartTime(),
- DebugUtil.printId(context.queryId()),
- context.getStartTime(), -1, -1,
- QueryDetail.QueryMemState.RUNNING,
- context.getDatabase(),
- originStmt.originStmt);
+ DebugUtil.printId(context.queryId()),
+ context.getStartTime(), -1, -1,
+ QueryDetail.QueryMemState.RUNNING,
+ context.getDatabase(),
+ originStmt.originStmt);
context.setQueryDetail(queryDetail);
QueryDetailQueue.addOrUpdateQueryDetail(queryDetail);
if (queryStmt.isExplain()) {
- String explainString = planner.getExplainString(planner.getFragments(), queryStmt.isVerbose() ? TExplainLevel.VERBOSE: TExplainLevel.NORMAL.NORMAL);
+ String explainString = planner.getExplainString(planner.getFragments(), queryStmt.isVerbose() ? TExplainLevel.VERBOSE : TExplainLevel.NORMAL.NORMAL);
handleExplainStmt(explainString);
- return;
+ return false;
}
coord = new Coordinator(context, analyzer, planner);
- QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
+ QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
- coord.exec();
+ boolean isCacheEnabled = context.getSessionVariable().isEnableResultCache();
+ LOG.debug("Session level cache is {}", (isCacheEnabled ? "enabled" : false));
+ Cache cache = null;
+ byte[] cachedVal = null;
+ ArrayList batches = null;
+ if (isCacheEnabled) {
+ cache = CacheFactory.getUniversalCache();
+ cachedVal = cache.get(namedKey);
+ }
+
+ isCached = (cachedVal != null);
+ if (isCached) {
+ batches = (ArrayList) SerializationUtils.deserialize(cachedVal);
+ } else {
+ coord.exec();
+ if (isCacheEnabled) {
+ // List is not serializable but ArrayList is.
+ batches = new ArrayList<>();
+ }
+ }
// if python's MysqlDb get error after sendfields, it can't catch the exception
// so We need to send fields after first batch arrived
@@ -625,19 +662,38 @@ private void handleQueryStmt() throws Exception {
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
}
+ Iterator itr = (isCached) ? batches.iterator() : null;
+ if (isCached && itr == null) {
+ isCached = false;
+ LOG.info("do not get batches from SerializationUtils");
+ }
while (true) {
- batch = coord.getNext();
+ if (isCached) {
+ // Theoretically, the batch must have next before gets into eof.
+ // Otherwise, it is corrupted result.
+ batch = itr.next();
+ } else {
+ batch = coord.getNext();
+ if (isCacheEnabled) {
+ batches.add(batch);
+ }
+ }
// for outfile query, there will be only one empty batch send back with eos flag
if (batch.getBatch() != null && !isOutfileQuery) {
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
- }
- context.updateReturnRows(batch.getBatch().getRows().size());
+ }
+ context.updateReturnRows(batch.getBatch().getRows().size());
}
if (batch.isEos()) {
break;
}
}
+ if (cachedVal == null && isCacheEnabled) {
+ cachedVal = SerializationUtils.serialize(batches);
+ cache.put(namedKey, cachedVal);
+ LOG.debug("Put into cache with named key: " + namedKey);
+ }
statisticsForAuditLog = batch.getQueryStatistics();
if (!isOutfileQuery) {
@@ -645,6 +701,7 @@ private void handleQueryStmt() throws Exception {
} else {
context.getState().setOk(statisticsForAuditLog.returned_rows, 0, "");
}
+ return isCached;
}
// Process a select statement.
@@ -884,6 +941,7 @@ public void sendShowResult(ShowResultSet resultSet) throws IOException {
context.getState().setEof();
}
+
// Process show statement
private void handleShow() throws IOException, AnalysisException, DdlException {
ShowExecutor executor = new ShowExecutor(context, (ShowStmt) parsedStmt);