From 6562e1cc39ba89b877e854fc2b10fcf26988f839 Mon Sep 17 00:00:00 2001 From: turbo jason Date: Fri, 7 Aug 2020 11:17:55 +0800 Subject: [PATCH 1/7] [Apache][Doris][Feature][FE]Sql Result Cache --- fe/fe-core/pom.xml | 6 + .../main/java/org/apache/doris/PaloFe.java | 18 +- .../java/org/apache/doris/cache/Cache.java | 84 ++++++++++ .../doris/cache/CacheExecutorFactory.java | 49 ++++++ .../org/apache/doris/cache/CacheFactory.java | 41 +++++ .../org/apache/doris/cache/CacheStats.java | 132 +++++++++++++++ .../org/apache/doris/cache/GlobalCache.java | 56 +++++++ .../org/apache/doris/cache/HybridCache.java | 56 +++++++ .../apache/doris/cache/SimpleLocalCache.java | 158 ++++++++++++++++++ .../java/org/apache/doris/common/Config.java | 18 ++ .../apache/doris/common/FeMetaVersion.java | 4 +- .../doris/common/util/ProfileManager.java | 47 +++--- .../util/StringUtils.java} | 31 ++-- .../org/apache/doris/metric/GaugeMetric.java | 22 ++- .../apache/doris/metric/MetricCalculator.java | 37 ++++ .../org/apache/doris/metric/MetricRepo.java | 70 +++++--- .../org/apache/doris/mysql/MysqlChannel.java | 3 + .../java/org/apache/doris/qe/RowBatch.java | 7 +- .../org/apache/doris/qe/SessionVariable.java | 51 +++++- .../org/apache/doris/qe/StmtExecutor.java | 95 ++++++++--- 20 files changed, 884 insertions(+), 101 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cache/Cache.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cache/CacheExecutorFactory.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cache/CacheFactory.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cache/CacheStats.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cache/GlobalCache.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cache/HybridCache.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java rename fe/fe-core/src/main/java/org/apache/doris/{metric/GaugeMetricImpl.java => common/util/StringUtils.java} (63%) diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 06de633d94e32f..0a8f3c5414360d 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -454,6 +454,12 @@ under the License. spark-sql_2.12 provided + + + com.github.ben-manes.caffeine + caffeine + 2.6.2 + diff --git a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java index 5458b88269fff4..de71acfe11ce87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java +++ b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java @@ -17,6 +17,7 @@ package org.apache.doris; +import org.apache.doris.cache.CacheFactory; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.CommandLineOptions; import org.apache.doris.common.Config; @@ -91,7 +92,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args) { Log4jConfig.initLogging(); // set dns cache ttl - java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); + java.security.Security.setProperty("networkaddress.cache.ttl", "60"); // check command line options checkCommandLineOptions(cmdLineOpts); @@ -104,6 +105,11 @@ public static void start(String dorisHomeDir, String pidDir, String[] args) { // init catalog and wait it be ready Catalog.getCurrentCatalog().initialize(args); Catalog.getCurrentCatalog().waitForReady(); + // Initialize the result cache if enabled + LOG.debug("result cache is " + (Config.enable_result_cache ? "enabled" : "disabled")); + if (Config.enable_result_cache) { + CacheFactory.getUniversalCache(); + } // init and start: // 1. QeService for MySQL Server @@ -141,12 +147,12 @@ public static void start(String dorisHomeDir, String pidDir, String[] args) { * Specify the helper node when joining a bdb je replication group * -b --bdb * Run bdbje debug tools - * + * * -l --listdb * List all database names in bdbje * -d --db * Specify a database in bdbje - * + * * -s --stat * Print statistic of a database, including count, first key, last key * -f --from @@ -155,7 +161,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args) { * Specify the end scan key * -m --metaversion * Specify the meta version to decode log value - * + * */ private static CommandLineOptions parseArgs(String[] args) { CommandLineParser commandLineParser = new BasicParser(); @@ -194,7 +200,7 @@ private static CommandLineOptions parseArgs(String[] args) { System.err.println("BDBJE database name is missing"); System.exit(-1); } - + if (cmd.hasOption('s') || cmd.hasOption("stat")) { BDBToolOptions bdbOpts = new BDBToolOptions(false, dbName, true, "", "", 0); return new CommandLineOptions(false, "", bdbOpts); @@ -224,7 +230,7 @@ private static CommandLineOptions parseArgs(String[] args) { System.exit(-1); } } - + BDBToolOptions bdbOpts = new BDBToolOptions(false, dbName, false, fromKey, endKey, metaVersion); return new CommandLineOptions(false, "", bdbOpts); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cache/Cache.java b/fe/fe-core/src/main/java/org/apache/doris/cache/Cache.java new file mode 100644 index 00000000000000..118bb57289ed2a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cache/Cache.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cache; + +import com.google.common.base.Preconditions; + +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Map; + +public interface Cache extends Closeable { + + byte[] get(NamedKey key); + + void put(NamedKey key, byte[] value); + + Map getBulk(Iterable keys); + + CacheStats getStats(); + + boolean isLocal(); + + class NamedKey { + public final String namespace; + public final byte[] key; + + public NamedKey(String namespace, byte[] key) { + Preconditions.checkNotNull(namespace, "Namespace must not be null"); + Preconditions.checkNotNull(key, "Key must not be null"); + this.namespace = namespace; + this.key = key; + } + + public byte[] toByteArray() { + final byte[] nsBytes = namespace.getBytes(StandardCharsets.UTF_8); + return ByteBuffer.allocate(Integer.BYTES + nsBytes.length + key.length) + .putInt(nsBytes.length) + .put(nsBytes) + .put(key) + .array(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o instanceof NamedKey) { + NamedKey namedKey = (NamedKey) o; + return namespace.equals(namedKey.namespace) && Arrays.equals(key, namedKey.key); + } + + return false; + } + + @Override + public int hashCode() { + return 31 * namespace.hashCode() + Arrays.hashCode(key); + } + + @Override + public String toString() { + return namespace + "_" + new String(key, StandardCharsets.UTF_8); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cache/CacheExecutorFactory.java b/fe/fe-core/src/main/java/org/apache/doris/cache/CacheExecutorFactory.java new file mode 100644 index 00000000000000..9a94f549c74ad5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cache/CacheExecutorFactory.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cache; + +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; + +public enum CacheExecutorFactory { + COMMON_FJP { + @Override + public Executor createExecutor() { + return ForkJoinPool.commonPool(); + } + }, + SINGLE_THREAD { + @Override + public Executor createExecutor() { + return new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()) { + @Override + protected void finalize() { + this.shutdown(); + } + }; + } + }; + + public abstract Executor createExecutor(); + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cache/CacheFactory.java b/fe/fe-core/src/main/java/org/apache/doris/cache/CacheFactory.java new file mode 100644 index 00000000000000..46de0ed507e9f3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cache/CacheFactory.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cache; + +public class CacheFactory { + private static volatile Cache universalCache; + + /** + * Initialize the result cache. + * + * @return + */ + public static Cache getUniversalCache() { + if (universalCache == null) { + synchronized (CacheFactory.class) { + if (universalCache == null) { + // Now just use the simple local cache. + // TODO Felix: add global/central cache or hybrid cache (l1/l2) + universalCache = SimpleLocalCache.create(); + } + } + } + return universalCache; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cache/CacheStats.java b/fe/fe-core/src/main/java/org/apache/doris/cache/CacheStats.java new file mode 100644 index 00000000000000..c2fa36e5b00d93 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cache/CacheStats.java @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cache; + +/** + * Basic statistics for cache efficiency + */ +public class CacheStats { + /** + * Accumulated number of cache hits + */ + private final long numHits; + + /** + * Accumulated number of cache misses + */ + private final long numMisses; + + /** + * Accumulated number of cache size by entries + */ + private final long size; + + /** + * Accumulated number of cache size by bytes + */ + private final long sizeInBytes; + + /** + * Accumulated number of cache evictions + */ + private final long numEvictions; + + /** + * Accumulated number of cache timeouts + */ + private final long numTimeouts; + + /** + * Accumulated number of cache errors + */ + private final long numErrors; + + public CacheStats( + long numHits, + long numMisses, + long size, + long sizeInBytes, + long numEvictions, + long numTimeouts, + long numErrors + ) { + this.numHits = numHits; + this.numMisses = numMisses; + this.size = size; + this.sizeInBytes = sizeInBytes; + this.numEvictions = numEvictions; + this.numTimeouts = numTimeouts; + this.numErrors = numErrors; + } + + public long getNumHits() { + return numHits; + } + + public long getNumMisses() { + return numMisses; + } + + public long getNumEntries() { + return size; + } + + public long getSizeInBytes() { + return sizeInBytes; + } + + public long getNumEvictions() { + return numEvictions; + } + + public long getNumTimeouts() { + return numTimeouts; + } + + public long getNumErrors() { + return numErrors; + } + + public long numLookups() { + return numHits + numMisses; + } + + public double hitRate() { + long lookups = numLookups(); + return lookups == 0 ? 0 : numHits / (double) lookups; + } + + public long averageBytes() { + return size == 0 ? 0 : sizeInBytes / size; + } + + public CacheStats delta(CacheStats oldStats) { + if (oldStats == null) { + return this; + } + return new CacheStats( + numHits - oldStats.numHits, + numMisses - oldStats.numMisses, + size - oldStats.size, + sizeInBytes - oldStats.sizeInBytes, + numEvictions - oldStats.numEvictions, + numTimeouts - oldStats.numTimeouts, + numErrors - oldStats.numErrors + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cache/GlobalCache.java b/fe/fe-core/src/main/java/org/apache/doris/cache/GlobalCache.java new file mode 100644 index 00000000000000..45a93777366dcb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cache/GlobalCache.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cache; + +import java.io.IOException; +import java.util.Map; + +/** + * TODO Felix: implement a global one with JimDB + */ +public class GlobalCache implements Cache { + @Override + public byte[] get(NamedKey key) { + return new byte[0]; + } + + @Override + public void put(NamedKey key, byte[] value) { + + } + + @Override + public Map getBulk(Iterable keys) { + return null; + } + + @Override + public CacheStats getStats() { + return null; + } + + @Override + public boolean isLocal() { + return false; + } + + @Override + public void close() throws IOException { + + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/cache/HybridCache.java b/fe/fe-core/src/main/java/org/apache/doris/cache/HybridCache.java new file mode 100644 index 00000000000000..2e07560da861ed --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cache/HybridCache.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cache; + +import java.io.IOException; +import java.util.Map; + +/** + * TODO Felix: Implement a hybrid cache with multiple-level cache support. + */ +public class HybridCache implements Cache { + @Override + public byte[] get(NamedKey key) { + return new byte[0]; + } + + @Override + public void put(NamedKey key, byte[] value) { + + } + + @Override + public Map getBulk(Iterable keys) { + return null; + } + + @Override + public boolean isLocal() { + return false; + } + + @Override + public void close() throws IOException { + + } + + @Override + public CacheStats getStats() { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java b/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java new file mode 100644 index 00000000000000..389b0e21ebd597 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cache; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; +import org.apache.doris.common.Config; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.OptionalLong; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * A base cache on each FE node, local only + */ +public class SimpleLocalCache implements Cache { + private static final Logger LOG = LogManager.getLogger(SimpleLocalCache.class); + /** + * Minimum cost in "weight" per entry; + */ + private static final int FIXED_COST = 8; + private static final int MAX_DEFAULT_BYTES = 1024 * 1024 * 1024; + private static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance(); + private static final LZ4FastDecompressor LZ4_DECOMPRESSOR = LZ4_FACTORY.fastDecompressor(); + private static final LZ4Compressor LZ4_COMPRESSOR = LZ4_FACTORY.fastCompressor(); + + private final com.github.benmanes.caffeine.cache.Cache cache; + + public static Cache create() { + return create(CacheExecutorFactory.COMMON_FJP.createExecutor()); + } + + // Used in testing + public static Cache create(final Executor executor) { + LOG.info("Instance cache with expiration " + Config.result_cache_expire_after_in_milliseconds + + " milliseconds, max size " + Config.result_cache_size_in_bytes + " bytes"); + Caffeine builder = Caffeine.newBuilder().recordStats(); + if (Config.result_cache_expire_after_in_milliseconds >= 0) { + builder.expireAfterWrite(Config.result_cache_expire_after_in_milliseconds, TimeUnit.MILLISECONDS); + } + if (Config.result_cache_size_in_bytes >= 0) { + builder.maximumWeight(Config.result_cache_size_in_bytes); + } else { + builder.maximumWeight(Math.min(MAX_DEFAULT_BYTES, Runtime.getRuntime().maxMemory() / 10)); + } + builder.weigher((NamedKey key, byte[] value) -> value.length + + key.key.length + + key.namespace.length() * Character.BYTES + + FIXED_COST) + .executor(executor); + return new SimpleLocalCache(builder.build()); + } + + private SimpleLocalCache(final com.github.benmanes.caffeine.cache.Cache cache) { + this.cache = cache; + } + + @Override + public byte[] get(NamedKey key) { + return decompress(cache.getIfPresent(key)); + } + + @Override + public void put(NamedKey key, byte[] value) { + cache.put(key, compress(value)); + } + + @Override + public Map getBulk(Iterable keys) { + // The assumption here is that every value is accessed at least once. Materializing here ensures deserialize is only + // called *once* per value. + return ImmutableMap.copyOf(Maps.transformValues(cache.getAllPresent(keys), this::decompress)); + } + + // This is completely racy with put. Any values missed should be evicted later anyways. So no worries. + public void close(String namespace) { + // Evict on close + cache.asMap().keySet().removeIf(key -> key.namespace.equals(namespace)); + } + + @Override + public void close() { + cache.cleanUp(); + } + + @Override + public CacheStats getStats() { + final com.github.benmanes.caffeine.cache.stats.CacheStats stats = cache.stats(); + final long size = cache + .policy().eviction() + .map(eviction -> eviction.isWeighted() ? eviction.weightedSize() : OptionalLong.empty()) + .orElse(OptionalLong.empty()).orElse(-1); + return new CacheStats( + stats.hitCount(), + stats.missCount(), + cache.estimatedSize(), + size, + stats.evictionCount(), + 0, + stats.loadFailureCount() + ); + } + + @Override + public boolean isLocal() { + return true; + } + + + @VisibleForTesting + com.github.benmanes.caffeine.cache.Cache getCache() { + return cache; + } + + private byte[] decompress(byte[] bytes) { + if (bytes == null) { + return null; + } + final int decompressedLen = ByteBuffer.wrap(bytes).getInt(); + final byte[] out = new byte[decompressedLen]; + LZ4_DECOMPRESSOR.decompress(bytes, Integer.BYTES, out, 0, out.length); + return out; + } + + private byte[] compress(byte[] value) { + final int len = LZ4_COMPRESSOR.maxCompressedLength(value.length); + final byte[] out = new byte[len]; + final int compressedSize = LZ4_COMPRESSOR.compress(value, 0, value.length, out, 0); + return ByteBuffer.allocate(compressedSize + Integer.BYTES) + .putInt(value.length) + .put(out, 0, compressedSize) + .array(); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 26fccc0c42541f..abbd177bc1e681 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1208,4 +1208,22 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_allowed_in_element_num_of_delete = 1024; + /** + * Whether or not the result cache is enabled in Fe level, it can be overwritten with connection/session + * level setting in Context. + */ + @ConfField(mutable = true) + public static boolean enable_result_cache = false; + + /** + * Specify how long an entry will be expired in milliseconds, 10000 by default. + */ + @ConfField(mutable = true) + public static long result_cache_expire_after_in_milliseconds = 10*1000; + + /** + * Specify the overall threshold of local cache in bytes, 1G bytes by default. + */ + @ConfField(mutable = true) + public static long result_cache_size_in_bytes = 1024 * 1024 * 1024; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index 35f453dcdaddca..fcce614ef34eb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -190,6 +190,8 @@ public final class FeMetaVersion { // force drop db, force drop table, force drop partition // make force drop operation do not recycle meta public static final int VERSION_89 = 89; + // fe result cache + public static final int VERSION_90 = 90; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_89; + public static final int VERSION_CURRENT = VERSION_90; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index 1705bb632ccd88..fa9be0e2ee83e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -37,13 +37,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; /* - * if you want to visit the atrribute(such as queryID,defaultDb) + * if you want to visit the atrribute(such as queryID,defaultDb) * you can use profile.getInfoStrings("queryId") * All attributes can be seen from the above. - * + * * why the element in the finished profile arary is not RuntimeProfile, - * the purpose is let coordinator can destruct earlier(the fragment profile is in Coordinator) - * + * the purpose is let coordinator can destruct earlier(the fragment profile is in Coordinator) + * */ public class ProfileManager { private static final Logger LOG = LogManager.getLogger(ProfileManager.class); @@ -59,39 +59,40 @@ public class ProfileManager { public static final String SQL_STATEMENT = "Sql Statement"; public static final String USER = "User"; public static final String DEFAULT_DB = "Default Db"; - + public static final String IS_CACHED = "IS Cached"; + public static final ArrayList PROFILE_HEADERS = new ArrayList( Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE, - START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE)); - + START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE, IS_CACHED)); + private class ProfileElement { - public Map infoStrings = Maps.newHashMap(); + public Map infoStrings = Maps.newHashMap(); public String profileContent; } - + // only protect profileDeque; profileMap is concurrent, no need to protect - private ReentrantReadWriteLock lock; + private ReentrantReadWriteLock lock; private ReadLock readLock; private WriteLock writeLock; private Deque profileDeque; private Map profileMap; // from QueryId to RuntimeProfile - + public static ProfileManager getInstance() { if (INSTANCE == null) { INSTANCE = new ProfileManager(); } return INSTANCE; } - + private ProfileManager() { - lock = new ReentrantReadWriteLock(true); + lock = new ReentrantReadWriteLock(true); readLock = lock.readLock(); writeLock = lock.writeLock(); profileDeque = new LinkedList(); profileMap = new ConcurrentHashMap(); } - + public ProfileElement createElement(RuntimeProfile profile) { ProfileElement element = new ProfileElement(); RuntimeProfile summaryProfile = profile.getChildList().get(0).first; @@ -101,12 +102,12 @@ public ProfileElement createElement(RuntimeProfile profile) { element.profileContent = profile.toString(); return element; } - + public void pushProfile(RuntimeProfile profile) { if (profile == null) { return; } - + ProfileElement element = createElement(profile); String queryId = element.infoStrings.get(ProfileManager.QUERY_ID); // check when push in, which can ensure every element in the list has QUERY_ID column, @@ -115,10 +116,10 @@ public void pushProfile(RuntimeProfile profile) { LOG.warn("the key or value of Map is null, " + "may be forget to insert 'QUERY_ID' column into infoStrings"); } - + profileMap.put(queryId, element); writeLock.lock(); - try { + try { if (profileDeque.size() >= ARRAY_SIZE) { profileMap.remove(profileDeque.getFirst().infoStrings.get(QUERY_ID)); profileDeque.removeFirst(); @@ -128,7 +129,7 @@ public void pushProfile(RuntimeProfile profile) { writeLock.unlock(); } } - + public List> getAllQueries() { List> result = Lists.newArrayList(); readLock.lock(); @@ -137,9 +138,9 @@ public List> getAllQueries() { while (reverse.hasNext()) { ProfileElement element = (ProfileElement) reverse.next(); Map infoStrings = element.infoStrings; - + List row = Lists.newArrayList(); - for (String str : PROFILE_HEADERS ) { + for (String str : PROFILE_HEADERS) { row.add(infoStrings.get(str)); } result.add(row); @@ -149,7 +150,7 @@ public List> getAllQueries() { } return result; } - + public String getProfile(String queryID) { readLock.lock(); try { @@ -157,7 +158,7 @@ public String getProfile(String queryID) { if (element == null) { return null; } - + return element.profileContent; } finally { readLock.unlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/StringUtils.java similarity index 63% rename from fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java rename to fe/fe-core/src/main/java/org/apache/doris/common/util/StringUtils.java index a66bc4f02d85aa..c382457f91b735 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/StringUtils.java @@ -15,22 +15,23 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.metric; +package org.apache.doris.common.util; -public class GaugeMetricImpl extends GaugeMetric { +import com.google.common.base.Preconditions; - public GaugeMetricImpl(String name, MetricUnit unit, String description) { - super(name, unit, description); - } - - private T value; - - public void setValue(T v) { - this.value = v; - } +import java.nio.charset.StandardCharsets; - @Override - public T getValue() { - return value; +/** + * Common String utilities for Doris + */ +public class StringUtils { + /** + * Get UTF 8 bytes from input string + * @param str + * @return + */ + public static byte[] toUtf8(String str){ + Preconditions.checkNotNull(str, "Input String for UTF8 conversion is null!"); + return str.getBytes(StandardCharsets.UTF_8); } -} +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetric.java b/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetric.java index 2e8d8191d20c00..80480b2abc7341 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetric.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/GaugeMetric.java @@ -20,9 +20,27 @@ /* * Gauge metric is updated every time it is visited */ -public abstract class GaugeMetric extends Metric { - +public class GaugeMetric extends Metric { + /** + * Construct an instance with specified name and description + * + * @param name + * @param description + * @return + */ public GaugeMetric(String name, MetricUnit unit, String description) { super(name, MetricType.GAUGE, unit, description); } + + private T value; + + public void setValue(T v) { + this.value = v; + } + + @Override + public T getValue() { + return value; + } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java index 05aa38e79f7c3e..84ac07af5ffff0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java @@ -17,6 +17,9 @@ package org.apache.doris.metric; +import org.apache.doris.cache.CacheFactory; +import org.apache.doris.cache.CacheStats; + import java.util.List; import java.util.TimerTask; @@ -29,6 +32,7 @@ public class MetricCalculator extends TimerTask { private long lastQueryCounter = -1; private long lastRequestCounter = -1; private long lastQueryErrCounter = -1; + private CacheStats priorStats = null; @Override public void run() { @@ -65,6 +69,28 @@ private void update() { MetricRepo.GAUGE_QUERY_ERR_RATE.setValue(errRate < 0 ? 0.0 : errRate); lastQueryErrCounter = currentErrCounter; + // Cache stats + if (priorStats == null) { + priorStats = CacheFactory.getUniversalCache().getStats(); + MetricRepo.COUNTER_RESULT_CACHE_ERRORS.increase(priorStats.getNumErrors()); + MetricRepo.COUNTER_RESULT_CACHE_TIMEOUTS.increase(priorStats.getNumTimeouts()); + MetricRepo.COUNTER_RESULT_CACHE_EVICTIONS.increase(priorStats.getNumEvictions()); + MetricRepo.GAUGE_RESULT_CACHE_SIZE_IN_BYTES.setValue(priorStats.getSizeInBytes()); + MetricRepo.GAUGE_RESULT_CACHE_ENTRIES.setValue(priorStats.getNumEntries()); + MetricRepo.COUNTER_RESULT_CACHE_MISSES.increase(priorStats.getNumMisses()); + MetricRepo.COUNTER_RESULT_CACHE_HITS.increase(priorStats.getNumHits()); + } else { + CacheStats currentStats = CacheFactory.getUniversalCache().getStats(); + MetricRepo.COUNTER_RESULT_CACHE_ERRORS.increase(deltaCounter(priorStats.getNumErrors(), currentStats.getNumErrors())); + MetricRepo.COUNTER_RESULT_CACHE_TIMEOUTS.increase(deltaCounter(priorStats.getNumTimeouts(), currentStats.getNumTimeouts())); + MetricRepo.COUNTER_RESULT_CACHE_EVICTIONS.increase(deltaCounter(priorStats.getNumEvictions(), currentStats.getNumEvictions())); + MetricRepo.GAUGE_RESULT_CACHE_SIZE_IN_BYTES.setValue(currentStats.getSizeInBytes()); + MetricRepo.GAUGE_RESULT_CACHE_ENTRIES.setValue(currentStats.getNumEntries()); + MetricRepo.COUNTER_RESULT_CACHE_MISSES.increase(deltaCounter(priorStats.getNumMisses(), currentStats.getNumMisses())); + MetricRepo.COUNTER_RESULT_CACHE_HITS.increase(deltaCounter(priorStats.getNumHits(), currentStats.getNumHits())); + priorStats = currentStats; + } + lastTs = currentTs; // max tabet compaction score of all backends @@ -77,4 +103,15 @@ private void update() { } MetricRepo.GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(maxCompactionScore); } + + /** + * compute delta value for counter metrics + * + * @param prior + * @param curent + * @return + */ + private static long deltaCounter(long prior, long curent) { + return Math.max(0L, curent - prior); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index e4e2b99f23a8a1..677e35f4875cfe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -51,7 +51,7 @@ public final class MetricRepo { private static final MetricRegistry METRIC_REGISTER = new MetricRegistry(); private static final DorisMetricRegistry PALO_METRIC_REGISTER = new DorisMetricRegistry(); - + public static volatile boolean isInit = false; public static final SystemMetrics SYSTEM_METRICS = new SystemMetrics(); @@ -84,14 +84,23 @@ public final class MetricRepo { public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES; public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS; + // Metrics for the result cache + public static LongCounterMetric COUNTER_RESULT_CACHE_HITS; + public static LongCounterMetric COUNTER_RESULT_CACHE_MISSES; + public static GaugeMetric GAUGE_RESULT_CACHE_ENTRIES; + public static GaugeMetric GAUGE_RESULT_CACHE_SIZE_IN_BYTES; + public static LongCounterMetric COUNTER_RESULT_CACHE_EVICTIONS; + public static LongCounterMetric COUNTER_RESULT_CACHE_TIMEOUTS; + public static LongCounterMetric COUNTER_RESULT_CACHE_ERRORS; + public static Histogram HISTO_QUERY_LATENCY; public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY; // following metrics will be updated by metric calculator - public static GaugeMetricImpl GAUGE_QUERY_PER_SECOND; - public static GaugeMetricImpl GAUGE_REQUEST_PER_SECOND; - public static GaugeMetricImpl GAUGE_QUERY_ERR_RATE; - public static GaugeMetricImpl GAUGE_MAX_TABLET_COMPACTION_SCORE; + public static GaugeMetric GAUGE_QUERY_PER_SECOND; + public static GaugeMetric GAUGE_REQUEST_PER_SECOND; + public static GaugeMetric GAUGE_QUERY_ERR_RATE; + public static GaugeMetric GAUGE_MAX_TABLET_COMPACTION_SCORE; private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Metric-Timer-Pool", true); private static MetricCalculator metricCalculator = new MetricCalculator(); @@ -117,8 +126,8 @@ public Long getValue() { } }; gauge.addLabel(new MetricLabel("job", "load")) - .addLabel(new MetricLabel("type", jobType.name())) - .addLabel(new MetricLabel("state", state.name())); + .addLabel(new MetricLabel("type", jobType.name())) + .addLabel(new MetricLabel("state", state.name())); PALO_METRIC_REGISTER.addPaloMetrics(gauge); } } @@ -129,7 +138,7 @@ public Long getValue() { if (jobType != JobType.SCHEMA_CHANGE && jobType != JobType.ROLLUP) { continue; } - + GaugeMetric gauge = (GaugeMetric) new GaugeMetric("job", MetricUnit.NOUNIT, "job statistics") { @Override @@ -145,8 +154,8 @@ public Long getValue() { } }; gauge.addLabel(new MetricLabel("job", "alter")) - .addLabel(new MetricLabel("type", jobType.name())) - .addLabel(new MetricLabel("state", "running")); + .addLabel(new MetricLabel("type", jobType.name())) + .addLabel(new MetricLabel("state", "running")); PALO_METRIC_REGISTER.addPaloMetrics(gauge); } @@ -192,16 +201,16 @@ public Long getValue() { // qps, rps and error rate // these metrics should be set an init value, in case that metric calculator is not running - GAUGE_QUERY_PER_SECOND = new GaugeMetricImpl<>("qps", MetricUnit.NOUNIT, "query per second"); + GAUGE_QUERY_PER_SECOND = new GaugeMetric<>("qps", MetricUnit.NOUNIT, "query per second"); GAUGE_QUERY_PER_SECOND.setValue(0.0); PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_QUERY_PER_SECOND); - GAUGE_REQUEST_PER_SECOND = new GaugeMetricImpl<>("rps", MetricUnit.NOUNIT, "request per second"); + GAUGE_REQUEST_PER_SECOND = new GaugeMetric<>("rps", MetricUnit.NOUNIT, "request per second"); GAUGE_REQUEST_PER_SECOND.setValue(0.0); PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_REQUEST_PER_SECOND); - GAUGE_QUERY_ERR_RATE = new GaugeMetricImpl<>("query_err_rate", MetricUnit.NOUNIT, "query error rate"); + GAUGE_QUERY_ERR_RATE = new GaugeMetric<>("query_err_rate", MetricUnit.NOUNIT, "query error rate"); PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_QUERY_ERR_RATE); GAUGE_QUERY_ERR_RATE.setValue(0.0); - GAUGE_MAX_TABLET_COMPACTION_SCORE = new GaugeMetricImpl<>("max_tablet_compaction_score", + GAUGE_MAX_TABLET_COMPACTION_SCORE = new GaugeMetric<>("max_tablet_compaction_score", MetricUnit.NOUNIT, "max tablet compaction score of all backends"); PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_MAX_TABLET_COMPACTION_SCORE); GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(0L); @@ -224,17 +233,17 @@ public Long getValue() { PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_MODE_SQL); COUNTER_CACHE_HIT_SQL = new LongCounterMetric("cache_hit_sql", MetricUnit.REQUESTS, "total hits query by sql model"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_HIT_SQL); - COUNTER_CACHE_MODE_PARTITION = new LongCounterMetric("query_mode_partition", MetricUnit.REQUESTS, - "total query of partition mode"); + COUNTER_CACHE_MODE_PARTITION = new LongCounterMetric("query_mode_partition", MetricUnit.REQUESTS, + "total query of partition mode"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_MODE_PARTITION); - COUNTER_CACHE_HIT_PARTITION = new LongCounterMetric("cache_hit_partition", MetricUnit.REQUESTS, - "total hits query by partition model"); + COUNTER_CACHE_HIT_PARTITION = new LongCounterMetric("cache_hit_partition", MetricUnit.REQUESTS, + "total hits query by partition model"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_HIT_PARTITION); - COUNTER_CACHE_PARTITION_ALL = new LongCounterMetric("partition_all", MetricUnit.REQUESTS, - "scan partition of cache partition model"); + COUNTER_CACHE_PARTITION_ALL = new LongCounterMetric("partition_all", MetricUnit.REQUESTS, + "scan partition of cache partition model"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_PARTITION_ALL); - COUNTER_CACHE_PARTITION_HIT = new LongCounterMetric("partition_hit", MetricUnit.REQUESTS, - "hit partition of cache partition model"); + COUNTER_CACHE_PARTITION_HIT = new LongCounterMetric("partition_hit", MetricUnit.REQUESTS, + "hit partition of cache partition model"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_PARTITION_HIT); COUNTER_LOAD_FINISHED = new LongCounterMetric("load_finished", MetricUnit.REQUESTS, "total load finished"); @@ -269,6 +278,21 @@ public Long getValue() { "total error rows of routine load"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS); + COUNTER_RESULT_CACHE_HITS = new LongCounterMetric("result_cache_hits", MetricUnit.NOUNIT,"Accumulated number of cache hits"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_HITS); + COUNTER_RESULT_CACHE_MISSES = new LongCounterMetric("result_cache_misses", MetricUnit.NOUNIT,"Accumulated number of cache misses"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_MISSES); + GAUGE_RESULT_CACHE_ENTRIES = new GaugeMetric("result_cache_entries", MetricUnit.NOUNIT,"Accumulated number of cache size by entries"); + PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_RESULT_CACHE_ENTRIES); + GAUGE_RESULT_CACHE_SIZE_IN_BYTES = new GaugeMetric("result_cache_size_in_bytes", MetricUnit.BYTES,"Accumulated number of cache size by bytes"); + PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_RESULT_CACHE_SIZE_IN_BYTES); + COUNTER_RESULT_CACHE_EVICTIONS = new LongCounterMetric("result_cache_evictions", MetricUnit.NOUNIT,"Accumulated number of cache evictions"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_EVICTIONS); + COUNTER_RESULT_CACHE_TIMEOUTS = new LongCounterMetric("result_cache_timeouts", MetricUnit.NOUNIT,"Accumulated number of cache timeouts"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TIMEOUTS); + COUNTER_RESULT_CACHE_ERRORS= new LongCounterMetric("result_cache_errors", MetricUnit.NOUNIT,"Accumulated number of cache errors"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_ERRORS); + // 3. histogram HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms")); HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("editlog", "write", "latency", "ms")); @@ -404,7 +428,7 @@ public static synchronized String getMetric(MetricVisitor visitor) { for (Map.Entry entry : histograms.entrySet()) { visitor.visitHistogram(sb, entry.getKey(), entry.getValue()); } - + // node info visitor.getNodeInfo(sb); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java index fa7ba25d394518..afb73e1cc6cb80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java @@ -248,6 +248,7 @@ private void writeBuffer(ByteBuffer buffer) throws IOException { public void sendOnePacket(ByteBuffer packet) throws IOException { int bufLen; int oldLimit = packet.limit(); + packet.mark(); while (oldLimit - packet.position() >= MAX_PHYSICAL_PACKET_LENGTH) { bufLen = MAX_PHYSICAL_PACKET_LENGTH; packet.limit(packet.position() + bufLen); @@ -259,6 +260,8 @@ public void sendOnePacket(ByteBuffer packet) throws IOException { packet.limit(oldLimit); writeBuffer(packet); accSequenceId(); + //Restore to state before read,as we may cache it for future use. + packet.reset(); } public void sendAndFlush(ByteBuffer packet) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java b/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java index b6dfce96ecead0..19b6a7c3c36635 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java @@ -20,9 +20,12 @@ import org.apache.doris.proto.PQueryStatistics; import org.apache.doris.thrift.TResultBatch; -public final class RowBatch { +import java.io.Serializable; + +public final class RowBatch implements Serializable { private TResultBatch batch; - private PQueryStatistics statistics; + //transient for cache + private transient PQueryStatistics statistics; private boolean eos; public RowBatch() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index d9e72413b668cc..96f5152dfafb6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -37,7 +37,7 @@ // System variable public class SessionVariable implements Serializable, Writable { - + static final Logger LOG = LogManager.getLogger(StmtExecutor.class); public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; public static final String QUERY_TIMEOUT = "query_timeout"; @@ -67,7 +67,7 @@ public class SessionVariable implements Serializable, Writable { public static final String NET_BUFFER_LENGTH = "net_buffer_length"; public static final String CODEGEN_LEVEL = "codegen_level"; // mem limit can't smaller than bufferpool's default page size - public static final int MIN_EXEC_MEM_LIMIT = 2097152; + public static final int MIN_EXEC_MEM_LIMIT = 2097152; public static final String BATCH_SIZE = "batch_size"; public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations"; public static final String DISABLE_COLOCATE_JOIN = "disable_colocate_join"; @@ -75,9 +75,10 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; public static final String ENABLE_SPILLING = "enable_spilling"; public static final String PREFER_JOIN_METHOD = "prefer_join_method"; - + public static final String ENABLE_SQL_CACHE = "enable_sql_cache"; public static final String ENABLE_PARTITION_CACHE = "enable_partition_cache"; + public static final String ENABLE_RESULT_CACHE = "enable_result_cache"; public static final int MIN_EXEC_INSTANCE_NUM = 1; public static final int MAX_EXEC_INSTANCE_NUM = 32; @@ -86,7 +87,7 @@ public class SessionVariable implements Serializable, Writable { // user can set instance num after exchange, no need to be equal to nums of before exchange public static final String PARALLEL_EXCHANGE_INSTANCE_NUM = "parallel_exchange_instance_num"; /* - * configure the mem limit of load process on BE. + * configure the mem limit of load process on BE. * Previously users used exec_mem_limit to set memory limits. * To maintain compatibility, the default value of load_mem_limit is 0, * which means that the load memory limit is still using exec_mem_limit. @@ -233,6 +234,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PARTITION_CACHE) private boolean enablePartitionCache = false; + @VariableMgr.VarAttr(name = ENABLE_RESULT_CACHE) + private boolean enableResultCache = false; + @VariableMgr.VarAttr(name = FORWARD_TO_MASTER) private boolean forwardToMaster = false; @@ -411,9 +415,13 @@ public boolean isDisableColocateJoin() { return disableColocateJoin; } - public String getPreferJoinMethod() {return preferJoinMethod; } + public String getPreferJoinMethod() { + return preferJoinMethod; + } - public void setPreferJoinMethod(String preferJoinMethod) {this.preferJoinMethod = preferJoinMethod; } + public void setPreferJoinMethod(String preferJoinMethod) { + this.preferJoinMethod = preferJoinMethod; + } public int getParallelExecInstanceNum() { return parallelExecInstanceNum; @@ -423,7 +431,9 @@ public int getExchangeInstanceParallel() { return exchangeInstanceParallel; } - public boolean getEnableInsertStrict() { return enableInsertStrict; } + public boolean getEnableInsertStrict() { + return enableInsertStrict; + } public void setEnableInsertStrict(boolean enableInsertStrict) { this.enableInsertStrict = enableInsertStrict; @@ -444,13 +454,33 @@ public boolean isEnablePartitionCache() { public void setEnablePartitionCache(boolean enablePartitionCache) { this.enablePartitionCache = enablePartitionCache; } - + + /** + * Check if the result cache is enabled for this session. True by default. + * + * @return True for cached-enabled, otherwise false. + */ + public boolean isEnableResultCache() { + return enableResultCache; + } + + /** + * Turn on/off result cache for this session. + * + * @param resultCacheEnabledInSession + */ + public void setEnableResultCache(boolean resultCacheEnabledInSession) { + this.enableResultCache = resultCacheEnabledInSession; + } + // Serialize to thrift object public boolean getForwardToMaster() { return forwardToMaster; } - public boolean isUseV2Rollup() { return useV2Rollup; } + public boolean isUseV2Rollup() { + return useV2Rollup; + } // for unit test public void setUseV2Rollup(boolean useV2Rollup) { @@ -623,6 +653,9 @@ public void readFields(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_62) { exchangeInstanceParallel = in.readInt(); } + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_90) { + enableResultCache = in.readBoolean(); + } } else { readFromJson(in); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 09e2e3cee7c20c..4431cffe1a3748 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -36,6 +36,8 @@ import org.apache.doris.analysis.StmtRewriter; import org.apache.doris.analysis.UnsupportedStmt; import org.apache.doris.analysis.UseStmt; +import org.apache.doris.cache.Cache; +import org.apache.doris.cache.CacheFactory; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -55,6 +57,7 @@ import org.apache.doris.common.util.ProfileManager; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.common.util.StringUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.EtlJobType; import org.apache.doris.metric.MetricRepo; @@ -81,17 +84,22 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.commons.lang.SerializationUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; import java.io.StringReader; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.doris.cache.Cache.NamedKey; + // Do one COM_QEURY process. // first: Parse receive byte array to statement struct. // second: Do handle function for statement. @@ -114,6 +122,7 @@ public class StmtExecutor { private boolean isProxy; private ShowResultSet proxyResultSet = null; private PQueryStatistics statisticsForAuditLog; + private boolean isCached; // this constructor is mainly for proxy public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) { @@ -155,6 +164,8 @@ public void initProfile(long beginTimeInNanoSecond) { summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser()); summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase()); summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt.originStmt); + // Add additional information to query profile summary + summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ? "Yes" : "No"); profile.addChild(summaryProfile); if (coord != null) { coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond)); @@ -242,10 +253,9 @@ public void execute() throws Exception { if (parsedStmt instanceof QueryStmt) { context.getState().setIsQuery(true); int retryTime = Config.max_query_retry_time; - for (int i = 0; i < retryTime; i ++) { + for (int i = 0; i < retryTime; i++) { try { - handleQueryStmt(); - if (context.getSessionVariable().isReportSucc()) { + if (!handleQueryStmt() && context.getSessionVariable().isReportSucc()) { writeProfile(beginTimeInNanoSecond); } break; @@ -388,7 +398,7 @@ public void analyze(TQueryOptions tQueryOptions) throws UserException { LOG.info("analysis exception happened when parsing stmt {}, id: {}, error: {}", originStmt, context.getStmtId(), syntaxError, e); if (syntaxError == null) { - throw e; + throw e; } else { throw new AnalysisException(syntaxError, e); } @@ -407,7 +417,7 @@ public void analyze(TQueryOptions tQueryOptions) throws UserException { if (isForwardToMaster()) { return; } - + analyzer = new Analyzer(context.getCatalog(), context); // Convert show statement to select statement here if (parsedStmt instanceof ShowStmt) { @@ -487,7 +497,7 @@ private void analyzeAndGenerateQueryPlan(TQueryOptions tQueryOptions) throws Use // types and column labels to restore them after the rewritten stmt has been // reset() and re-analyzed. List origResultTypes = Lists.newArrayList(); - for (Expr e: parsedStmt.getResultExprs()) { + for (Expr e : parsedStmt.getResultExprs()) { origResultTypes.add(e.getType()); } List origColLabels = @@ -552,7 +562,7 @@ private void handleKill() throws DdlException { // Only user itself and user with admin priv can kill connection if (!killCtx.getQualifiedUser().equals(ConnectContext.get().getQualifiedUser()) && !Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), - PrivPredicate.ADMIN)) { + PrivPredicate.ADMIN)) { ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, id); } @@ -576,31 +586,55 @@ private void handleSetStmt() { } // Process a select statement. - private void handleQueryStmt() throws Exception { + private boolean handleQueryStmt() throws Exception { // Every time set no send flag and clean all data in buffer context.getMysqlChannel().reset(); QueryStmt queryStmt = (QueryStmt) parsedStmt; + // Use connection ID as session identifier + NamedKey namedKey = new NamedKey(String.valueOf(context.getConnectionId()), + StringUtils.toUtf8(queryStmt.toSql())); + LOG.debug("Result Cache NamedKey [" + namedKey + "]"); + QueryDetail queryDetail = new QueryDetail(context.getStartTime(), - DebugUtil.printId(context.queryId()), - context.getStartTime(), -1, -1, - QueryDetail.QueryMemState.RUNNING, - context.getDatabase(), - originStmt.originStmt); + DebugUtil.printId(context.queryId()), + context.getStartTime(), -1, -1, + QueryDetail.QueryMemState.RUNNING, + context.getDatabase(), + originStmt.originStmt); context.setQueryDetail(queryDetail); QueryDetailQueue.addOrUpdateQueryDetail(queryDetail); if (queryStmt.isExplain()) { - String explainString = planner.getExplainString(planner.getFragments(), queryStmt.isVerbose() ? TExplainLevel.VERBOSE: TExplainLevel.NORMAL.NORMAL); + String explainString = planner.getExplainString(planner.getFragments(), queryStmt.isVerbose() ? TExplainLevel.VERBOSE : TExplainLevel.NORMAL.NORMAL); handleExplainStmt(explainString); - return; + return false; } coord = new Coordinator(context, analyzer, planner); - QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); - coord.exec(); + boolean isCacheEnabled = context.getSessionVariable().isEnableResultCache(); + LOG.debug("Session level cache is " + (isCacheEnabled ? "enabled" : false)); + Cache cache = null; + byte[] cachedVal = null; + ArrayList batches = null; + if (isCacheEnabled) { + cache = CacheFactory.getUniversalCache(); + cachedVal = cache.get(namedKey); + } + + isCached = (cachedVal != null); + if (isCached) { + batches = (ArrayList) SerializationUtils.deserialize(cachedVal); + } else { + coord.exec(); + if (isCacheEnabled) { + // List is not serializable but ArrayList is. + batches = new ArrayList<>(); + } + } // if python's MysqlDb get error after sendfields, it can't catch the exception // so We need to send fields after first batch arrived @@ -619,19 +653,38 @@ private void handleQueryStmt() throws Exception { if (!isOutfileQuery) { sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); } + Iterator itr = (isCached) ? batches.iterator() : null; + if (isCached && itr == null) { + isCached = false; + LOG.info("do not get batches from SerializationUtils"); + } while (true) { - batch = coord.getNext(); + if (isCached) { + // Theoretically, the batch must have next before gets into eof. + // Otherwise, it is corrupted result. + batch = itr.next(); + } else { + batch = coord.getNext(); + if (isCacheEnabled) { + batches.add(batch); + } + } // for outfile query, there will be only one empty batch send back with eos flag if (batch.getBatch() != null && !isOutfileQuery) { for (ByteBuffer row : batch.getBatch().getRows()) { channel.sendOnePacket(row); - } - context.updateReturnRows(batch.getBatch().getRows().size()); + } + context.updateReturnRows(batch.getBatch().getRows().size()); } if (batch.isEos()) { break; } } + if (cachedVal == null && isCacheEnabled) { + cachedVal = SerializationUtils.serialize(batches); + cache.put(namedKey, cachedVal); + LOG.debug("Put into cache with named key: " + namedKey); + } statisticsForAuditLog = batch.getQueryStatistics(); if (!isOutfileQuery) { @@ -639,6 +692,7 @@ private void handleQueryStmt() throws Exception { } else { context.getState().setOk(statisticsForAuditLog.returned_rows, 0, ""); } + return isCached; } // Process a select statement. @@ -882,6 +936,7 @@ public void sendShowResult(ShowResultSet resultSet) throws IOException { context.getState().setEof(); } + // Process show statement private void handleShow() throws IOException, AnalysisException, DdlException { ShowExecutor executor = new ShowExecutor(context, (ShowStmt) parsedStmt); From cca5e171751fb0ad2167806922ae73bc6cfe6122 Mon Sep 17 00:00:00 2001 From: Zhengguo Yang Date: Fri, 21 Aug 2020 22:58:08 +0800 Subject: [PATCH 2/7] [doris] fix some comment and conflict --- .../org/apache/doris/alter/AlterJobV2.java | 1 + .../java/org/apache/doris/catalog/Column.java | 1 + .../org/apache/doris/catalog/FsBroker.java | 1 + .../apache/doris/catalog/TempPartitions.java | 1 + .../apache/doris/common/FeMetaVersion.java | 1 - .../org/apache/doris/persist/DropDbInfo.java | 1 + .../doris/persist/DropPartitionInfo.java | 1 + .../doris/persist/TruncateTableInfo.java | 1 + .../org/apache/doris/qe/SessionVariable.java | 95 ++++++++++--------- 9 files changed, 56 insertions(+), 47 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java index 3a5147295ed0fe..f7578caa3f145b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -266,6 +266,7 @@ public void write(DataOutput out) throws IOException { out.writeLong(timeoutMs); } + @Deprecated public void readFields(DataInput in) throws IOException { // read common members as write in AlterJobV2.write(). // except 'type' member, which is read in AlterJobV2.read() diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index 63aa2c51779f73..33352a86eee632 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -445,6 +445,7 @@ public void write(DataOutput out) throws IOException { Text.writeString(out, json); } + @Deprecated private void readFields(DataInput in) throws IOException { name = Text.readString(in); type = ColumnType.read(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java index 12e69ecad7b834..a7fffe2ae483a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java @@ -119,6 +119,7 @@ public void write(DataOutput out) throws IOException { Text.writeString(out, json); } + @Deprecated private void readFields(DataInput in) throws IOException { ip = Text.readString(in); port = in.readInt(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java index 4a1c59d6a095ab..b5fb03c87454d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java @@ -133,6 +133,7 @@ public static TempPartitions read(DataInput in) throws IOException { } } + @Deprecated private void readFields(DataInput in) throws IOException { int size = in.readInt(); for (int i = 0; i < size; i++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index 49483835af04bd..12d950ec1ab45a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -190,7 +190,6 @@ public final class FeMetaVersion { // force drop db, force drop table, force drop partition // make force drop operation do not recycle meta public static final int VERSION_89 = 89; - // fe result cache // for global variable persist public static final int VERSION_90 = 90; // note: when increment meta version, should assign the latest version to VERSION_CURRENT diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropDbInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropDbInfo.java index 1fb335cd379fe8..f6dce08765dc33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropDbInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropDbInfo.java @@ -51,6 +51,7 @@ public boolean isForceDrop() { return forceDrop; } + @Deprecated private void readFields(DataInput in) throws IOException { dbName = Text.readString(in); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java index f10b8d274897d3..91669cc8f55139 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java @@ -72,6 +72,7 @@ public boolean isForceDrop() { return forceDrop; } + @Deprecated private void readFields(DataInput in) throws IOException { dbId = in.readLong(); tableId = in.readLong(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java index 303212353e2e98..f75ad132816fd2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java @@ -87,6 +87,7 @@ public void write(DataOutput out) throws IOException { Text.writeString(out, json); } + @Deprecated private void readFields(DataInput in) throws IOException { dbId = in.readLong(); tblId = in.readLong(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 96f5152dfafb6c..3e87241ed94dc0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -608,54 +608,57 @@ public void write(DataOutput out) throws IOException { Text.writeString(out, root.toString()); } + @Deprecated + private void readFromStream(DataInput in) throws IOException { + codegenLevel = in.readInt(); + netBufferLength = in.readInt(); + sqlSafeUpdates = in.readInt(); + timeZone = Text.readString(in); + netReadTimeout = in.readInt(); + netWriteTimeout = in.readInt(); + waitTimeout = in.readInt(); + interactiveTimeout = in.readInt(); + queryCacheType = in.readInt(); + autoIncrementIncrement = in.readInt(); + maxAllowedPacket = in.readInt(); + sqlSelectLimit = in.readLong(); + sqlAutoIsNull = in.readBoolean(); + collationDatabase = Text.readString(in); + collationConnection = Text.readString(in); + charsetServer = Text.readString(in); + charsetResults = Text.readString(in); + charsetConnection = Text.readString(in); + charsetClient = Text.readString(in); + txIsolation = Text.readString(in); + autoCommit = in.readBoolean(); + resourceGroup = Text.readString(in); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_65) { + sqlMode = in.readLong(); + } else { + // read old version SQL mode + Text.readString(in); + sqlMode = 0L; + } + isReportSucc = in.readBoolean(); + queryTimeoutS = in.readInt(); + maxExecMemByte = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_37) { + collationServer = Text.readString(in); + } + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_38) { + batchSize = in.readInt(); + disableStreamPreaggregations = in.readBoolean(); + parallelExecInstanceNum = in.readInt(); + } + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_62) { + exchangeInstanceParallel = in.readInt(); + } + enableResultCache = in.readBoolean(); + } + public void readFields(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_67) { - codegenLevel = in.readInt(); - netBufferLength = in.readInt(); - sqlSafeUpdates = in.readInt(); - timeZone = Text.readString(in); - netReadTimeout = in.readInt(); - netWriteTimeout = in.readInt(); - waitTimeout = in.readInt(); - interactiveTimeout = in.readInt(); - queryCacheType = in.readInt(); - autoIncrementIncrement = in.readInt(); - maxAllowedPacket = in.readInt(); - sqlSelectLimit = in.readLong(); - sqlAutoIsNull = in.readBoolean(); - collationDatabase = Text.readString(in); - collationConnection = Text.readString(in); - charsetServer = Text.readString(in); - charsetResults = Text.readString(in); - charsetConnection = Text.readString(in); - charsetClient = Text.readString(in); - txIsolation = Text.readString(in); - autoCommit = in.readBoolean(); - resourceGroup = Text.readString(in); - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_65) { - sqlMode = in.readLong(); - } else { - // read old version SQL mode - Text.readString(in); - sqlMode = 0L; - } - isReportSucc = in.readBoolean(); - queryTimeoutS = in.readInt(); - maxExecMemByte = in.readLong(); - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_37) { - collationServer = Text.readString(in); - } - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_38) { - batchSize = in.readInt(); - disableStreamPreaggregations = in.readBoolean(); - parallelExecInstanceNum = in.readInt(); - } - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_62) { - exchangeInstanceParallel = in.readInt(); - } - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_90) { - enableResultCache = in.readBoolean(); - } + readFromStream(in); } else { readFromJson(in); } From af98ad14875054846f2f0277058201dcb529f9cd Mon Sep 17 00:00:00 2001 From: turbo jason Date: Tue, 25 Aug 2020 15:16:59 +0800 Subject: [PATCH 3/7] [Apache][doris]set max cache size for per query --- .../apache/doris/cache/SimpleLocalCache.java | 6 + .../java/org/apache/doris/common/Config.java | 504 ++++++++++-------- 2 files changed, 302 insertions(+), 208 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java b/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java index 389b0e21ebd597..94fb7035f15385 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java @@ -86,6 +86,12 @@ public byte[] get(NamedKey key) { @Override public void put(NamedKey key, byte[] value) { + final byte[] compresssize = compress(value); + if (compresssize.length > Config.result_cache_size_per_query_in_bytes) { + LOG.info(" result size more than result_cache_size_per_query_in_bytes: " + + Config.result_cache_size_per_query_in_bytes + " so not storage in cache"); + return; + } cache.put(key, compress(value)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index f7c77b806c60fc..40d02d3f488a0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -21,93 +21,108 @@ import org.apache.doris.http.HttpServer; public class Config extends ConfigBase { - + /** * The max size of one sys log and audit log */ - @ConfField public static int log_roll_size_mb = 1024; // 1 GB + @ConfField + public static int log_roll_size_mb = 1024; // 1 GB /** * sys_log_dir: - * This specifies FE log dir. FE will produces 2 log files: - * fe.log: all logs of FE process. - * fe.warn.log all WARNING and ERROR log of FE process. - * + * This specifies FE log dir. FE will produces 2 log files: + * fe.log: all logs of FE process. + * fe.warn.log all WARNING and ERROR log of FE process. + *

* sys_log_level: - * INFO, WARNING, ERROR, FATAL - * + * INFO, WARNING, ERROR, FATAL + *

* sys_log_roll_num: - * Maximal FE log files to be kept within an sys_log_roll_interval. - * default is 10, which means there will be at most 10 log files in a day - * + * Maximal FE log files to be kept within an sys_log_roll_interval. + * default is 10, which means there will be at most 10 log files in a day + *

* sys_log_verbose_modules: - * Verbose modules. VERBOSE level is implemented by log4j DEBUG level. - * eg: - * sys_log_verbose_modules = org.apache.doris.catalog - * This will only print debug log of files in package org.apache.doris.catalog and all its sub packages. - * + * Verbose modules. VERBOSE level is implemented by log4j DEBUG level. + * eg: + * sys_log_verbose_modules = org.apache.doris.catalog + * This will only print debug log of files in package org.apache.doris.catalog and all its sub packages. + *

* sys_log_roll_interval: - * DAY: log suffix is yyyyMMdd - * HOUR: log suffix is yyyyMMddHH - * + * DAY: log suffix is yyyyMMdd + * HOUR: log suffix is yyyyMMddHH + *

* sys_log_delete_age: - * default is 7 days, if log's last modify time is 7 days ago, it will be deleted. - * support format: - * 7d 7 days - * 10h 10 hours - * 60m 60 mins - * 120s 120 seconds + * default is 7 days, if log's last modify time is 7 days ago, it will be deleted. + * support format: + * 7d 7 days + * 10h 10 hours + * 60m 60 mins + * 120s 120 seconds */ @ConfField public static String sys_log_dir = PaloFe.DORIS_HOME_DIR + "/log"; - @ConfField public static String sys_log_level = "INFO"; - @ConfField public static int sys_log_roll_num = 10; - @ConfField public static String[] sys_log_verbose_modules = {}; - @ConfField public static String sys_log_roll_interval = "DAY"; - @ConfField public static String sys_log_delete_age = "7d"; + @ConfField + public static String sys_log_level = "INFO"; + @ConfField + public static int sys_log_roll_num = 10; + @ConfField + public static String[] sys_log_verbose_modules = {}; + @ConfField + public static String sys_log_roll_interval = "DAY"; + @ConfField + public static String sys_log_delete_age = "7d"; @Deprecated - @ConfField public static String sys_log_roll_mode = "SIZE-MB-1024"; + @ConfField + public static String sys_log_roll_mode = "SIZE-MB-1024"; /** * audit_log_dir: - * This specifies FE audit log dir. - * Audit log fe.audit.log contains all requests with related infos such as user, host, cost, status, etc. - * + * This specifies FE audit log dir. + * Audit log fe.audit.log contains all requests with related infos such as user, host, cost, status, etc. + *

* audit_log_roll_num: - * Maximal FE audit log files to be kept within an audit_log_roll_interval. - * + * Maximal FE audit log files to be kept within an audit_log_roll_interval. + *

* audit_log_modules: - * Slow query contains all queries which cost exceed *qe_slow_log_ms* - * + * Slow query contains all queries which cost exceed *qe_slow_log_ms* + *

* qe_slow_log_ms: - * If the response time of a query exceed this threshold, it will be recored in audit log as slow_query. - * + * If the response time of a query exceed this threshold, it will be recored in audit log as slow_query. + *

* audit_log_roll_interval: - * DAY: log suffix is yyyyMMdd - * HOUR: log suffix is yyyyMMddHH - * + * DAY: log suffix is yyyyMMdd + * HOUR: log suffix is yyyyMMddHH + *

* audit_log_delete_age: - * default is 30 days, if log's last modify time is 30 days ago, it will be deleted. - * support format: - * 7d 7 days - * 10h 10 hours - * 60m 60 mins - * 120s 120 seconds - */ - @ConfField public static String audit_log_dir = PaloFe.DORIS_HOME_DIR + "/log"; - @ConfField public static int audit_log_roll_num = 90; - @ConfField public static String[] audit_log_modules = {"slow_query", "query"}; - @ConfField(mutable = true) public static long qe_slow_log_ms = 5000; - @ConfField public static String audit_log_roll_interval = "DAY"; - @ConfField public static String audit_log_delete_age = "30d"; + * default is 30 days, if log's last modify time is 30 days ago, it will be deleted. + * support format: + * 7d 7 days + * 10h 10 hours + * 60m 60 mins + * 120s 120 seconds + */ + @ConfField + public static String audit_log_dir = PaloFe.DORIS_HOME_DIR + "/log"; + @ConfField + public static int audit_log_roll_num = 90; + @ConfField + public static String[] audit_log_modules = {"slow_query", "query"}; + @ConfField(mutable = true) + public static long qe_slow_log_ms = 5000; + @ConfField + public static String audit_log_roll_interval = "DAY"; + @ConfField + public static String audit_log_delete_age = "30d"; @Deprecated - @ConfField public static String audit_log_roll_mode = "TIME-DAY"; + @ConfField + public static String audit_log_roll_mode = "TIME-DAY"; /** * plugin_dir: - * plugin install directory + * plugin install directory */ - @ConfField public static String plugin_dir = System.getenv("DORIS_HOME") + "/plugins"; + @ConfField + public static String plugin_dir = System.getenv("DORIS_HOME") + "/plugins"; @ConfField(mutable = true, masterOnly = true) public static boolean plugin_enable = true; @@ -120,24 +135,26 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static int label_keep_max_second = 3 * 24 * 3600; // 3 days - + /** * The max keep time of some kind of jobs. * like schema change job and rollup job. */ @ConfField(mutable = true, masterOnly = true) public static int history_job_keep_max_second = 7 * 24 * 3600; // 7 days - + /** * Load label cleaner will run every *label_clean_interval_second* to clean the outdated jobs. */ - @ConfField public static int label_clean_interval_second = 4 * 3600; // 4 hours - + @ConfField + public static int label_clean_interval_second = 4 * 3600; // 4 hours + /** * the transaction will be cleaned after transaction_clean_interval_second seconds if the transaction is visible or aborted * we should make this interval as short as possible and each clean cycle as soon as possible */ - @ConfField public static int transaction_clean_interval_second = 30; + @ConfField + public static int transaction_clean_interval_second = 30; // Configurations for meta data durability /** @@ -146,14 +163,16 @@ public class Config extends ConfigBase { * 1. High write performance (SSD) * 2. Safe (RAID) */ - @ConfField public static String meta_dir = PaloFe.DORIS_HOME_DIR + "/doris-meta"; - + @ConfField + public static String meta_dir = PaloFe.DORIS_HOME_DIR + "/doris-meta"; + /** * temp dir is used to save intermediate results of some process, such as backup and restore process. * file in this dir will be cleaned after these process is finished. */ - @ConfField public static String tmp_dir = PaloFe.DORIS_HOME_DIR + "/temp_dir"; - + @ConfField + public static String tmp_dir = PaloFe.DORIS_HOME_DIR + "/temp_dir"; + /** * Edit log type. * BDB: write log to bdbje @@ -161,51 +180,56 @@ public class Config extends ConfigBase { */ @ConfField public static String edit_log_type = "BDB"; - + /** * bdbje port */ @ConfField public static int edit_log_port = 9010; - + /** * Master FE will save image every *edit_log_roll_num* meta journals. */ @ConfField(mutable = true, masterOnly = true) public static int edit_log_roll_num = 50000; - + /** * Non-master FE will stop offering service * if meta data delay gap exceeds *meta_delay_toleration_second* */ - @ConfField public static int meta_delay_toleration_second = 300; // 5 min - + @ConfField + public static int meta_delay_toleration_second = 300; // 5 min + /** * Master FE sync policy of bdbje. * If you only deploy one Follower FE, set this to 'SYNC'. If you deploy more than 3 Follower FE, * you can set this and the following 'replica_sync_policy' to WRITE_NO_SYNC. * more info, see: http://docs.oracle.com/cd/E17277_02/html/java/com/sleepycat/je/Durability.SyncPolicy.html */ - @ConfField public static String master_sync_policy = "SYNC"; // SYNC, NO_SYNC, WRITE_NO_SYNC - + @ConfField + public static String master_sync_policy = "SYNC"; // SYNC, NO_SYNC, WRITE_NO_SYNC + /** * Follower FE sync policy of bdbje. */ - @ConfField public static String replica_sync_policy = "SYNC"; // SYNC, NO_SYNC, WRITE_NO_SYNC - + @ConfField + public static String replica_sync_policy = "SYNC"; // SYNC, NO_SYNC, WRITE_NO_SYNC + /** * Replica ack policy of bdbje. * more info, see: http://docs.oracle.com/cd/E17277_02/html/java/com/sleepycat/je/Durability.ReplicaAckPolicy.html */ - @ConfField public static String replica_ack_policy = "SIMPLE_MAJORITY"; // ALL, NONE, SIMPLE_MAJORITY - + @ConfField + public static String replica_ack_policy = "SIMPLE_MAJORITY"; // ALL, NONE, SIMPLE_MAJORITY + /** * The heartbeat timeout of bdbje between master and follower. * the default is 30 seconds, which is same as default value in bdbje. * If the network is experiencing transient problems, of some unexpected long java GC annoying you, * you can try to increase this value to decrease the chances of false timeouts */ - @ConfField public static int bdbje_heartbeat_timeout_second = 30; + @ConfField + public static int bdbje_heartbeat_timeout_second = 30; /** * The lock timeout of bdbje operation @@ -235,7 +259,8 @@ public class Config extends ConfigBase { /** * the max txn number which bdbje can rollback when trying to rejoin the group */ - @ConfField public static int txn_rollback_limit = 100; + @ConfField + public static int txn_rollback_limit = 100; /** * Specified an IP for frontend, instead of the ip get by *InetAddress.getByName*. @@ -243,7 +268,8 @@ public class Config extends ConfigBase { * Default is "0.0.0.0", which means not set. * CAN NOT set this as a hostname, only IP. */ - @ConfField public static String frontend_address = "0.0.0.0"; + @ConfField + public static String frontend_address = "0.0.0.0"; /** * Declare a selection strategy for those servers have many ips. @@ -251,7 +277,8 @@ public class Config extends ConfigBase { * this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24 * If no ip match this rule, will choose one randomly. */ - @ConfField public static String priority_networks = ""; + @ConfField + public static String priority_networks = ""; /** * If true, FE will reset bdbje replication group(that is, to remove all electable nodes info) @@ -259,13 +286,14 @@ public class Config extends ConfigBase { * If all the electable nodes can not start, we can copy the meta data * to another node and set this config to true to try to restart the FE. */ - @ConfField public static String metadata_failure_recovery = "false"; + @ConfField + public static String metadata_failure_recovery = "false"; /** * If true, non-master FE will ignore the meta data delay gap between Master FE and its self, * even if the metadata delay gap exceeds *meta_delay_toleration_second*. * Non-master FE will still offer read service. - * + *

* This is helpful when you try to stop the Master FE for a relatively long time for some reason, * but still wish the non-master FE can offer read service. */ @@ -277,146 +305,164 @@ public class Config extends ConfigBase { * This value is checked whenever a non-master FE establishes a connection to master FE via BDBJE. * The connection is abandoned if the clock skew is larger than this value. */ - @ConfField public static long max_bdbje_clock_delta_ms = 5000; // 5s + @ConfField + public static long max_bdbje_clock_delta_ms = 5000; // 5s /** * Fe http port * Currently, all FEs' http port must be same. */ - @ConfField public static int http_port = 8030; + @ConfField + public static int http_port = 8030; /* * Netty http param */ - @ConfField public static int http_max_line_length = HttpServer.DEFAULT_MAX_LINE_LENGTH; + @ConfField + public static int http_max_line_length = HttpServer.DEFAULT_MAX_LINE_LENGTH; - @ConfField public static int http_max_header_size = HttpServer.DEFAULT_MAX_HEADER_SIZE; + @ConfField + public static int http_max_header_size = HttpServer.DEFAULT_MAX_HEADER_SIZE; - @ConfField public static int http_max_chunk_size = HttpServer.DEFAULT_MAX_CHUNK_SIZE; + @ConfField + public static int http_max_chunk_size = HttpServer.DEFAULT_MAX_CHUNK_SIZE; /** * The backlog_num for netty http server * When you enlarge this backlog_num, you should ensure it's value larger than * the linux /proc/sys/net/core/somaxconn config */ - @ConfField public static int http_backlog_num = 1024; + @ConfField + public static int http_backlog_num = 1024; /** * The connection timeout and socket timeout config for thrift server * The value for thrift_client_timeout_ms is set to be larger than zero to prevent * some hang up problems in java.net.SocketInputStream.socketRead0 */ - @ConfField public static int thrift_client_timeout_ms = 30000; + @ConfField + public static int thrift_client_timeout_ms = 30000; /** * The backlog_num for thrift server * When you enlarge this backlog_num, you should ensure it's value larger than * the linux /proc/sys/net/core/somaxconn config */ - @ConfField public static int thrift_backlog_num = 1024; + @ConfField + public static int thrift_backlog_num = 1024; /** * FE thrift server port */ - @ConfField public static int rpc_port = 9020; - + @ConfField + public static int rpc_port = 9020; + /** * FE mysql server port */ - @ConfField public static int query_port = 9030; + @ConfField + public static int query_port = 9030; /** * mysql service nio option. */ - @ConfField public static boolean mysql_service_nio_enabled = true; + @ConfField + public static boolean mysql_service_nio_enabled = true; /** * num of thread to handle io events in mysql. */ - @ConfField public static int mysql_service_io_threads_num = 4; + @ConfField + public static int mysql_service_io_threads_num = 4; /** * max num of thread to handle task in mysql. */ - @ConfField public static int max_mysql_service_task_threads_num = 4096; + @ConfField + public static int max_mysql_service_task_threads_num = 4096; /** * Cluster name will be shown as the title of web page */ - @ConfField public static String cluster_name = "Baidu Palo"; - + @ConfField + public static String cluster_name = "Baidu Palo"; + /** * node(FE or BE) will be considered belonging to the same Palo cluster if they have same cluster id. * Cluster id is usually a random integer generated when master FE start at first time. * You can also sepecify one. */ - @ConfField public static int cluster_id = -1; - + @ConfField + public static int cluster_id = -1; + /** * Cluster token used for internal authentication. */ - @ConfField public static String auth_token = ""; + @ConfField + public static String auth_token = ""; // Configurations for load, clone, create table, alter table etc. We will rarely change them /** * Maximal waiting time for creating a single replica. * eg. - * if you create a table with #m tablets and #n replicas for each tablet, - * the create table request will run at most (m * n * tablet_create_timeout_second) before timeout. + * if you create a table with #m tablets and #n replicas for each tablet, + * the create table request will run at most (m * n * tablet_create_timeout_second) before timeout. */ @ConfField(mutable = true, masterOnly = true) public static int tablet_create_timeout_second = 1; - + /** * In order not to wait too long for create table(index), set a max timeout. */ @ConfField(mutable = true, masterOnly = true) public static int max_create_table_timeout_second = 60; - + /** * Maximal waiting time for all publish version tasks of one transaction to be finished */ @ConfField(mutable = true, masterOnly = true) public static int publish_version_timeout_second = 30; // 30 seconds - + /** * minimal intervals between two publish version action */ - @ConfField public static int publish_version_interval_ms = 10; + @ConfField + public static int publish_version_interval_ms = 10; /** * The thrift server max worker threads */ - @ConfField public static int thrift_server_max_worker_threads = 4096; + @ConfField + public static int thrift_server_max_worker_threads = 4096; /** * Maximal wait seconds for straggler node in load * eg. - * there are 3 replicas A, B, C - * load is already quorum finished(A,B) at t1 and C is not finished - * if (current_time - t1) > 300s, then palo will treat C as a failure node - * will call transaction manager to commit the transaction and tell transaction manager - * that C is failed - * + * there are 3 replicas A, B, C + * load is already quorum finished(A,B) at t1 and C is not finished + * if (current_time - t1) > 300s, then palo will treat C as a failure node + * will call transaction manager to commit the transaction and tell transaction manager + * that C is failed + *

* This is also used when waiting for publish tasks - * + *

* TODO this parameter is the default value for all job and the DBA could specify it for separate job */ @ConfField(mutable = true, masterOnly = true) public static int load_straggler_wait_second = 300; - + /** * Maximal memory layout length of a row. default is 100 KB. * In BE, the maximal size of a RowBlock is 100MB(Configure as max_unpacked_row_block_size in be.conf). * And each RowBlock contains 1024 rows. So the maximal size of a row is approximately 100 KB. - * + *

* eg. - * schema: k1(int), v1(decimal), v2(varchar(2000)) - * then the memory layout length of a row is: 8(int) + 40(decimal) + 2000(varchar) = 2048 (Bytes) - * + * schema: k1(int), v1(decimal), v2(varchar(2000)) + * then the memory layout length of a row is: 8(int) + 40(decimal) + 2000(varchar) = 2048 (Bytes) + *

* See memory layout length of all types, run 'help create table' in mysql-client. - * - * If you want to increase this number to support more columns in a row, you also need to increase the + *

+ * If you want to increase this number to support more columns in a row, you also need to increase the * max_unpacked_row_block_size in be.conf. But the performance impact is unknown. */ @ConfField(mutable = true, masterOnly = true) @@ -426,10 +472,11 @@ public class Config extends ConfigBase { * The load scheduler running interval. * A load job will transfer its state from PENDING to LOADING to FINISHED. * The load scheduler will transfer load job from PENDING to LOADING - * while the txn callback will transfer load job from LOADING to FINISHED. + * while the txn callback will transfer load job from LOADING to FINISHED. * So a load job will cost at most one interval to finish when the concurrency has not reached the upper limit. */ - @ConfField public static int load_checker_interval_second = 5; + @ConfField + public static int load_checker_interval_second = 5; /** * Concurrency of HIGH priority pending load jobs. @@ -440,26 +487,31 @@ public class Config extends ConfigBase { * Currently, you can not specified the job priority manually, * and do not change this if you know what you are doing. */ - @ConfField public static int load_pending_thread_num_high_priority = 3; + @ConfField + public static int load_pending_thread_num_high_priority = 3; /** * Concurrency of NORMAL priority pending load jobs. * Do not change this if you know what you are doing. */ - @ConfField public static int load_pending_thread_num_normal_priority = 10; + @ConfField + public static int load_pending_thread_num_normal_priority = 10; /** * Concurrency of HIGH priority etl load jobs. * Do not change this if you know what you are doing. */ - @ConfField public static int load_etl_thread_num_high_priority = 3; + @ConfField + public static int load_etl_thread_num_high_priority = 3; /** * Concurrency of NORMAL priority etl load jobs. * Do not change this if you know what you are doing. */ - @ConfField public static int load_etl_thread_num_normal_priority = 10; + @ConfField + public static int load_etl_thread_num_normal_priority = 10; /** * Concurrency of delete jobs. */ - @ConfField public static int delete_thread_num = 10; + @ConfField + public static int delete_thread_num = 10; /** * Not available. */ @@ -482,13 +534,13 @@ public class Config extends ConfigBase { @Deprecated @ConfField(mutable = true, masterOnly = true) public static int mini_load_default_timeout_second = 3600; // 1 hour - + /** * Default insert load timeout */ @ConfField(mutable = true, masterOnly = true) public static int insert_load_default_timeout_second = 3600; // 1 hour - + /** * Default stream load and streaming mini load timeout */ @@ -550,7 +602,7 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static int desired_max_waiting_jobs = 100; - + /** * maximun concurrent running txn num including prepare, commit txns under a single db * txn manager will reject coming txns @@ -574,7 +626,8 @@ public class Config extends ConfigBase { /** * Clone checker's running interval. */ - @ConfField public static int clone_checker_interval_second = 300; + @ConfField + public static int clone_checker_interval_second = 300; /** * Default timeout of a single clone job. Set long enough to fit your replica size. * The larger the replica data size is, the more time is will cost to finish clone. @@ -593,7 +646,7 @@ public class Config extends ConfigBase { * If the priority is LOW, it will be delayed *clone_low_priority_delay_second* * after the job creation and then be executed. * This is to avoid a large number of clone jobs running at same time only because a host is down for a short time. - * + *

* NOTICE that this config(and *clone_normal_priority_delay_second* as well) * will not work if it's smaller then *clone_checker_interval_second* */ @@ -650,14 +703,16 @@ public class Config extends ConfigBase { * When create a table(or partition), you can specify its storage medium(HDD or SSD). * If not set, this specifies the default medium when creat. */ - @ConfField public static String default_storage_medium = "HDD"; + @ConfField + public static String default_storage_medium = "HDD"; /** * When create a table(or partition), you can specify its storage medium(HDD or SSD). * If set to SSD, this specifies the default duration that tablets will stay on SSD. * After that, tablets will be moved to HDD automatically. * You can set storage cooldown time in CREATE TABLE stmt. */ - @ConfField public static long storage_cooldown_second = 30 * 24 * 3600L; // 30 days + @ConfField + public static long storage_cooldown_second = 30 * 24 * 3600L; // 30 days /** * After dropping database(table/partition), you can recover it by using RECOVER stmt. * And this specifies the maximal data retention time. After time, the data will be deleted permanently. @@ -680,7 +735,8 @@ public class Config extends ConfigBase { /** * Export checker's running interval. */ - @ConfField public static int export_checker_interval_second = 5; + @ConfField + public static int export_checker_interval_second = 5; /** * Limitation of the concurrency of running export jobs. * Default is 5. @@ -718,12 +774,14 @@ public class Config extends ConfigBase { /** * Maximal number of connections per FE. */ - @ConfField public static int qe_max_connection = 1024; + @ConfField + public static int qe_max_connection = 1024; /** * Maximal number of thread in connection-scheduler-pool. */ - @ConfField public static int max_connection_scheduler_threads_num = 4096; + @ConfField + public static int max_connection_scheduler_threads_num = 4096; /** * The memory_limit for colocote join PlanFragment instance = @@ -740,9 +798,12 @@ public class Config extends ConfigBase { /** * The default user resource publishing timeout. */ - @ConfField public static int meta_publish_timeout_ms = 1000; - @ConfField public static boolean proxy_auth_enable = false; - @ConfField public static String proxy_auth_magic_prefix = "x@8"; + @ConfField + public static int meta_publish_timeout_ms = 1000; + @ConfField + public static boolean proxy_auth_enable = false; + @ConfField + public static String proxy_auth_magic_prefix = "x@8"; /** * Limit on the number of expr children of an expr tree. * Exceed this limit may cause long analysis time while holding database read lock. @@ -763,16 +824,21 @@ public class Config extends ConfigBase { * Plugins' path for BACKUP and RESTORE operations. Currently deprecated. */ @Deprecated - @ConfField public static String backup_plugin_path = "/tools/trans_file_tool/trans_files.sh"; + @ConfField + public static String backup_plugin_path = "/tools/trans_file_tool/trans_files.sh"; // Configurations for hadoop dpp /** * The following configurations are not available. */ - @ConfField public static String dpp_hadoop_client_path = "/lib/hadoop-client/hadoop/bin/hadoop"; - @ConfField public static long dpp_bytes_per_reduce = 100 * 1024 * 1024L; // 100M - @ConfField public static String dpp_default_cluster = "palo-dpp"; - @ConfField public static String dpp_default_config_str = "" + @ConfField + public static String dpp_hadoop_client_path = "/lib/hadoop-client/hadoop/bin/hadoop"; + @ConfField + public static long dpp_bytes_per_reduce = 100 * 1024 * 1024L; // 100M + @ConfField + public static String dpp_default_cluster = "palo-dpp"; + @ConfField + public static String dpp_default_config_str = "" + "{" + "hadoop_configs : '" + "mapred.job.priority=NORMAL;" @@ -784,7 +850,8 @@ public class Config extends ConfigBase { + "dfs.client.authserver.force_stop=true;" + "dfs.client.auth.method=0" + "'}"; - @ConfField public static String dpp_config_str = "" + @ConfField + public static String dpp_config_str = "" + "{palo-dpp : {" + "hadoop_palo_path : '/dir'," + "hadoop_configs : '" @@ -796,28 +863,32 @@ public class Config extends ConfigBase { // For forward compatibility, will be removed later. // check token when download image file. - @ConfField public static boolean enable_token_check = true; + @ConfField + public static boolean enable_token_check = true; /** * Set to true if you deploy Palo using thirdparty deploy manager * Valid options are: - * disable: no deploy manager - * k8s: Kubernetes - * ambari: Ambari - * local: Local File (for test or Boxer2 BCC version) + * disable: no deploy manager + * k8s: Kubernetes + * ambari: Ambari + * local: Local File (for test or Boxer2 BCC version) */ - @ConfField public static String enable_deploy_manager = "disable"; - + @ConfField + public static String enable_deploy_manager = "disable"; + // If use k8s deploy manager locally, set this to true and prepare the certs files - @ConfField public static boolean with_k8s_certs = false; - + @ConfField + public static boolean with_k8s_certs = false; + // Set runtime locale when exec some cmds - @ConfField public static String locale = "zh_CN.UTF-8"; + @ConfField + public static String locale = "zh_CN.UTF-8"; // default timeout of backup job @ConfField(mutable = true, masterOnly = true) public static int backup_job_default_timeout_ms = 86400 * 1000; // 1 day - + /** * 'storage_high_watermark_usage_percent' limit the max capacity usage percent of a Backend storage path. * 'storage_min_left_capacity_bytes' limit the minimum left capacity of a Backend storage path. @@ -842,34 +913,38 @@ public class Config extends ConfigBase { // update interval of tablet stat // All frontends will get tablet stat from all backends at each interval - @ConfField public static int tablet_stat_update_interval_second = 300; // 5 min + @ConfField + public static int tablet_stat_update_interval_second = 300; // 5 min // May be necessary to modify the following BRPC configurations in high concurrency scenarios. // The number of concurrent requests BRPC can processed - @ConfField public static int brpc_number_of_concurrent_requests_processed = 4096; + @ConfField + public static int brpc_number_of_concurrent_requests_processed = 4096; // BRPC idle wait time (ms) - @ConfField public static int brpc_idle_wait_max_time = 10000; - + @ConfField + public static int brpc_idle_wait_max_time = 10000; + /** - * if set to false, auth check will be disable, in case some goes wrong with the new privilege system. + * if set to false, auth check will be disable, in case some goes wrong with the new privilege system. */ - @ConfField public static boolean enable_auth_check = true; - + @ConfField + public static boolean enable_auth_check = true; + /** * Max bytes a broker scanner can process in one broker load job. * Commonly, each Backends has one broker scanner. */ @ConfField(mutable = true, masterOnly = true) public static long max_bytes_per_broker_scanner = 3 * 1024 * 1024 * 1024L; // 3G - + /** * Max number of load jobs, include PENDING、ETL、LOADING、QUORUM_FINISHED. * If exceed this number, load job is not allowed to be submitted. */ @ConfField(mutable = true, masterOnly = true) public static long max_unfinished_load_job = 1000; - + /** * If set to true, Planner will try to select replica of tablet on same host as this Frontend. * This may reduce network transmission in following case: @@ -880,7 +955,7 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true) public static boolean enable_local_replica_selection = false; - + /** * The timeout of executing async remote fragment. * In normal case, the async remote fragment will be executed in a short time. If system are under high load @@ -888,9 +963,9 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true) public static long remote_fragment_exec_timeout_ms = 5000; // 5 sec - + /** - * The number of query retries. + * The number of query retries. * A query may retry if we encounter RPC exception and no result has been sent to user. * You may reduce this number to avoid Avalanche disaster. */ @@ -903,12 +978,12 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true) public static long catalog_try_lock_timeout_ms = 5000; // 5 sec - + /** * if this is set to true - * all pending load job will failed when call begin txn api - * all prepare load job will failed when call commit txn api - * all committed load job will waiting to be published + * all pending load job will failed when call begin txn api + * all prepare load job will failed when call commit txn api + * all committed load job will waiting to be published */ @ConfField(mutable = true, masterOnly = true) public static boolean disable_load_job = false; @@ -918,20 +993,20 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static int db_used_data_quota_update_interval_secs = 300; - + /** * Load using hadoop cluster will be deprecated in future. * Set to true to disable this kind of load. */ @ConfField(mutable = true, masterOnly = true) public static boolean disable_hadoop_load = false; - + /** * fe will call es api to get es index shard info every es_state_sync_interval_secs */ @ConfField public static long es_state_sync_interval_second = 10; - + /** * the factor of delay time before deciding to repair tablet. * if priority is VERY_HIGH, repair it immediately. @@ -941,17 +1016,19 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static long tablet_repair_delay_factor_second = 60; - + /** * the default slot number per path in tablet scheduler * TODO(cmy): remove this config and dynamically adjust it by clone task statistic */ - @ConfField public static int schedule_slot_num_per_path = 2; - + @ConfField + public static int schedule_slot_num_per_path = 2; + /** * Deprecated after 0.10 */ - @ConfField public static boolean use_new_tablet_scheduler = true; + @ConfField + public static boolean use_new_tablet_scheduler = true; /** * the threshold of cluster balance score, if a backend's load score is 10% lower than average score, @@ -990,11 +1067,12 @@ public class Config extends ConfigBase { // 10000 replicas: 200ms @ConfField(mutable = true, masterOnly = true) public static int report_queue_size = 100; - + /** * If set to true, metric collector will be run as a daemon timer to collect metrics at fix interval */ - @ConfField public static boolean enable_metric_calculator = true; + @ConfField + public static boolean enable_metric_calculator = true; /** * the max routine load job num, including NEED_SCHEDULED, RUNNING, PAUSE @@ -1018,13 +1096,13 @@ public class Config extends ConfigBase { public static int max_routine_load_task_num_per_be = 5; /** - * The max number of files store in SmallFileMgr + * The max number of files store in SmallFileMgr */ @ConfField(mutable = true, masterOnly = true) public static int max_small_file_number = 100; /** - * The max size of a single file store in SmallFileMgr + * The max size of a single file store in SmallFileMgr */ @ConfField(mutable = true, masterOnly = true) public static int max_small_file_size_bytes = 1024 * 1024; // 1MB @@ -1032,15 +1110,18 @@ public class Config extends ConfigBase { /** * Save small files */ - @ConfField public static String small_file_dir = PaloFe.DORIS_HOME_DIR + "/small_files"; - + @ConfField + public static String small_file_dir = PaloFe.DORIS_HOME_DIR + "/small_files"; + /** * The following 2 configs can set to true to disable the automatic colocate tables's relocate and balance. * if 'disable_colocate_relocate' is set to true, ColocateTableBalancer will not relocate colocate tables when Backend unavailable. * if 'disable_colocate_balance' is set to true, ColocateTableBalancer will not balance colocate tables. */ - @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_relocate = false; - @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_balance = false; + @ConfField(mutable = true, masterOnly = true) + public static boolean disable_colocate_relocate = false; + @ConfField(mutable = true, masterOnly = true) + public static boolean disable_colocate_balance = false; /** * If set to true, the insert stmt with processing error will still return a label to user. @@ -1048,14 +1129,15 @@ public class Config extends ConfigBase { * The default value is false, which means if insert operation encounter errors, * exception will be thrown to user client directly without load label. */ - @ConfField(mutable = true, masterOnly = true) public static boolean using_old_load_usage_pattern = false; + @ConfField(mutable = true, masterOnly = true) + public static boolean using_old_load_usage_pattern = false; /** * This will limit the max recursion depth of hash distribution pruner. * eg: where a in (5 elements) and b in (4 elements) and c in (3 elements) and d in (2 elements). * a/b/c/d are distribution columns, so the recursion depth will be 5 * 4 * 3 * 2 = 120, larger than 100, * So that distribution pruner will no work and just return all buckets. - * + *

* Increase the depth can support distribution pruning for more elements, but may cost more CPU. */ @ConfField(mutable = true, masterOnly = false) @@ -1077,10 +1159,10 @@ public class Config extends ConfigBase { /** * The multi cluster feature will be deprecated in version 0.12 * set this config to true will disable all operations related to cluster feature, include: - * create/drop cluster - * add free backend/add backend to cluster/decommission cluster balance - * change the backends num of cluster - * link/migration db + * create/drop cluster + * add free backend/add backend to cluster/decommission cluster balance + * change the backends num of cluster + * link/migration db */ @ConfField(mutable = true) public static boolean disable_cluster_feature = true; @@ -1156,7 +1238,7 @@ public class Config extends ConfigBase { * This config will decide whether to resend agent task when create_time for agent_task is set, * only when current_time - create_time > agent_task_resend_wait_time_ms can ReportHandler do resend agent task */ - @ConfField (mutable = true, masterOnly = true) + @ConfField(mutable = true, masterOnly = true) public static long agent_task_resend_wait_time_ms = 5000; /** @@ -1172,10 +1254,10 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static long max_clone_task_timeout_sec = 2 * 60 * 60; // 2h - /** + /** * If set to true, fe will enable sql result cache * This option is suitable for offline data update scenarios - * case1 case2 case3 case4 + * case1 case2 case3 case4 * enable_sql_cache false true true false * enable_partition_cache false false true true */ @@ -1190,8 +1272,8 @@ public class Config extends ConfigBase { public static boolean cache_enable_partition_mode = true; /** - * Minimum interval between last version when caching results, - * This parameter distinguishes between offline and real-time updates + * Minimum interval between last version when caching results, + * This parameter distinguishes between offline and real-time updates */ @ConfField(mutable = true, masterOnly = false) public static int cache_last_version_interval_second = 900; @@ -1201,14 +1283,14 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = false) public static int cache_result_max_row_count = 3000; - + /** * Used to limit element num of InPredicate in delete statement. */ @ConfField(mutable = true, masterOnly = true) public static int max_allowed_in_element_num_of_delete = 1024; - - /** + + /** * In some cases, some tablets may have all replicas damaged or lost. * At this time, the data has been lost, and the damaged tablets * will cause the entire query to fail, and the remaining healthy tablets cannot be queried. @@ -1231,11 +1313,17 @@ public class Config extends ConfigBase { * Specify how long an entry will be expired in milliseconds, 10000 by default. */ @ConfField(mutable = true) - public static long result_cache_expire_after_in_milliseconds = 10*1000; + public static long result_cache_expire_after_in_milliseconds = 10 * 1000; /** * Specify the overall threshold of local cache in bytes, 1G bytes by default. */ @ConfField(mutable = true) public static long result_cache_size_in_bytes = 1024 * 1024 * 1024; + + /** + * Max Result Size per query + */ + @ConfField(mutable = true) + public static long result_cache_size_per_query_in_bytes = 1024 * 1024; } \ No newline at end of file From 6c2ca4690139acdf917193ec4d0acda39b7a148b Mon Sep 17 00:00:00 2001 From: turbo jason Date: Thu, 27 Aug 2020 10:46:45 +0800 Subject: [PATCH 4/7] [apache][doris][fe result cache] fix some comment --- .../main/java/org/apache/doris/cache/SimpleLocalCache.java | 4 ++-- .../src/main/java/org/apache/doris/qe/StmtExecutor.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java b/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java index 94fb7035f15385..bb3895a36dddf5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java @@ -88,11 +88,11 @@ public byte[] get(NamedKey key) { public void put(NamedKey key, byte[] value) { final byte[] compresssize = compress(value); if (compresssize.length > Config.result_cache_size_per_query_in_bytes) { - LOG.info(" result size more than result_cache_size_per_query_in_bytes: " + LOG.debug(" result size more than result_cache_size_per_query_in_bytes: " + Config.result_cache_size_per_query_in_bytes + " so not storage in cache"); return; } - cache.put(key, compress(value)); + cache.put(key, compresssize); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5390f5b8f333e3..b4e6cd546f04db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -598,8 +598,8 @@ private boolean handleQueryStmt() throws Exception { QueryStmt queryStmt = (QueryStmt) parsedStmt; // Use connection ID as session identifier NamedKey namedKey = new NamedKey(String.valueOf(context.getConnectionId()), - StringUtils.toUtf8(queryStmt.toSql())); - LOG.debug("Result Cache NamedKey [" + namedKey + "]"); + StringUtils.toUtf8(originStmt.originStmt)); + LOG.debug("Result Cache NamedKey [{}]", namedKey); QueryDetail queryDetail = new QueryDetail(context.getStartTime(), @@ -622,7 +622,7 @@ private boolean handleQueryStmt() throws Exception { new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); boolean isCacheEnabled = context.getSessionVariable().isEnableResultCache(); - LOG.debug("Session level cache is " + (isCacheEnabled ? "enabled" : false)); + LOG.debug("Session level cache is {}", (isCacheEnabled ? "enabled" : false)); Cache cache = null; byte[] cachedVal = null; ArrayList batches = null; From adc3772c5aa5a05066b7600fb59f75fda764af24 Mon Sep 17 00:00:00 2001 From: turbo jason Date: Thu, 27 Aug 2020 10:59:28 +0800 Subject: [PATCH 5/7] [apache][doris] add handleQueryStmt comment --- .../src/main/java/org/apache/doris/qe/StmtExecutor.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index b4e6cd546f04db..f1ae3423a67eb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -591,7 +591,10 @@ private void handleSetStmt() { context.getState().setOk(); } - // Process a select statement. + /** + * Process a select statement. + * if return true ,hit cache + */ private boolean handleQueryStmt() throws Exception { // Every time set no send flag and clean all data in buffer context.getMysqlChannel().reset(); From dd42c6b580afcbb53436dd8b531e03e5fa5d01ed Mon Sep 17 00:00:00 2001 From: turbo jason Date: Thu, 27 Aug 2020 14:10:03 +0800 Subject: [PATCH 6/7] [apache][doris]fix LOG class in SessionVariable --- .../src/main/java/org/apache/doris/qe/SessionVariable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 3e87241ed94dc0..7fb3fbd938355c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -38,7 +38,7 @@ // System variable public class SessionVariable implements Serializable, Writable { - static final Logger LOG = LogManager.getLogger(StmtExecutor.class); + static final Logger LOG = LogManager.getLogger(SessionVariable.class); public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; public static final String QUERY_TIMEOUT = "query_timeout"; public static final String IS_REPORT_SUCCESS = "is_report_success"; From fbcd2d2904b732e458423016d31fef3108819411 Mon Sep 17 00:00:00 2001 From: turbo jason Date: Fri, 5 Mar 2021 11:33:15 +0800 Subject: [PATCH 7/7] [fe][feture][cache] modify parameter and add which in fe_config --- .../administrator-guide/config/fe_config.md | 19 +++++++++ .../administrator-guide/config/fe_config.md | 15 +++++++ .../main/java/org/apache/doris/PaloFe.java | 4 +- .../apache/doris/cache/SimpleLocalCache.java | 16 +++---- .../java/org/apache/doris/common/Config.java | 12 +++--- .../org/apache/doris/metric/MetricRepo.java | 42 +++++++++---------- .../org/apache/doris/qe/SessionVariable.java | 4 +- 7 files changed, 73 insertions(+), 39 deletions(-) diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 9bcb9c0d3e5ae0..12e5b917e0de47 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -682,3 +682,22 @@ In some very special circumstances, such as code bugs, or human misoperation, et Set to true so that Doris will automatically use blank replicas to fill tablets which all replicas have been damaged or missing. Default is false. + + +#### `enable_result_cache_ttl` + +enable_result_ cache_ ttl variable is set in the user session. The user can customize whether to turn it on or not. The TTL time is used to determine whether the user's SQL uses cache. The correctness of the data is not guaranteed when the data is changed` + +The cache is stored and retrieved according to the user connected and the query SQL. If it exceeds the cache expiration time, the cache will not be hit and the cache will be cleaned + +### `result_cache_ttl_expire_after_in_milliseconds` + +enable_result_cache_ttl cache time + +### `result_cache_ttl_size_in_bytes` + +enable_result_cache_ttl cache size + +### `result_cache_ttl_size_per_query_in_bytes` + +enable_result_cache_ttl single query result max cache size \ No newline at end of file diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index 3f8ef1fc94d6d1..860e3958bbb6bb 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -679,6 +679,21 @@ thrift_client_timeout_ms 的值被设置为大于0来避免线程卡在java.net. 默认为 false。 +### `enable_result_cache_ttl` +result_cache_ttl 变量设置在用户Session中,用户可自定义是否开启,通过ttl时间来确定用户的sql是否使用缓存,`这里数据变更时不保证数据的正确性` +按照 用户 connectid,和查询的sql 来存储和获取缓存,超过缓存失效时间则命中不了缓存,该缓存也会被清理 + +### `result_cache_ttl_expire_after_in_milliseconds` + +enable_result_cache_ttl 失效销毁间隔 + +### `result_cache_ttl_size_in_bytes` + +enable_result_cache_ttl 缓存大小 + +### `result_cache_ttl_size_per_query_in_bytes` + +enable_result_cache_ttl 单个query结果,最大缓存大小 diff --git a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java index de71acfe11ce87..039158b10c6f2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java +++ b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java @@ -106,8 +106,8 @@ public static void start(String dorisHomeDir, String pidDir, String[] args) { Catalog.getCurrentCatalog().initialize(args); Catalog.getCurrentCatalog().waitForReady(); // Initialize the result cache if enabled - LOG.debug("result cache is " + (Config.enable_result_cache ? "enabled" : "disabled")); - if (Config.enable_result_cache) { + LOG.debug("result cache is " + (Config.enable_result_cache_ttl ? "enabled" : "disabled")); + if (Config.enable_result_cache_ttl) { CacheFactory.getUniversalCache(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java b/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java index bb3895a36dddf5..6d26a9a8abd668 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java @@ -56,14 +56,14 @@ public static Cache create() { // Used in testing public static Cache create(final Executor executor) { - LOG.info("Instance cache with expiration " + Config.result_cache_expire_after_in_milliseconds - + " milliseconds, max size " + Config.result_cache_size_in_bytes + " bytes"); + LOG.info("Instance cache with expiration " + Config.result_cache_ttl_expire_after_in_milliseconds + + " milliseconds, max size " + Config.result_cache_ttl_size_in_bytes + " bytes"); Caffeine builder = Caffeine.newBuilder().recordStats(); - if (Config.result_cache_expire_after_in_milliseconds >= 0) { + if (Config.result_cache_ttl_expire_after_in_milliseconds >= 0) { builder.expireAfterWrite(Config.result_cache_expire_after_in_milliseconds, TimeUnit.MILLISECONDS); } - if (Config.result_cache_size_in_bytes >= 0) { - builder.maximumWeight(Config.result_cache_size_in_bytes); + if (Config.result_cache_ttl_size_in_bytes >= 0) { + builder.maximumWeight(Config.result_cache_ttl_size_in_bytes); } else { builder.maximumWeight(Math.min(MAX_DEFAULT_BYTES, Runtime.getRuntime().maxMemory() / 10)); } @@ -87,9 +87,9 @@ public byte[] get(NamedKey key) { @Override public void put(NamedKey key, byte[] value) { final byte[] compresssize = compress(value); - if (compresssize.length > Config.result_cache_size_per_query_in_bytes) { - LOG.debug(" result size more than result_cache_size_per_query_in_bytes: " - + Config.result_cache_size_per_query_in_bytes + " so not storage in cache"); + if (compresssize.length > Config.result_cache_ttl_size_per_query_in_bytes) { + LOG.debug(" result size more than result_cache_ttl_size_per_query_in_bytes: " + + Config.result_cache_ttl_size_per_query_in_bytes + " so not storage in cache"); return; } cache.put(key, compresssize); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 40d02d3f488a0f..4d0f8013156062 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1302,28 +1302,28 @@ public class Config extends ConfigBase { public static boolean recover_with_empty_tablet = false; /** - * Sql_result_cache - * Whether or not the result cache is enabled in Fe level, it can be overwritten with connection/session + * enable_result_cache_ttl + * Whether or not the result cache ttl is enabled in Fe level, it can be overwritten with connection/session * level setting in Context. */ @ConfField(mutable = true) - public static boolean enable_result_cache = false; + public static boolean enable_result_cache_ttl = false; /** * Specify how long an entry will be expired in milliseconds, 10000 by default. */ @ConfField(mutable = true) - public static long result_cache_expire_after_in_milliseconds = 10 * 1000; + public static long result_cache_ttl_expire_after_in_milliseconds = 10 * 1000; /** * Specify the overall threshold of local cache in bytes, 1G bytes by default. */ @ConfField(mutable = true) - public static long result_cache_size_in_bytes = 1024 * 1024 * 1024; + public static long result_cache_ttl_size_in_bytes = 1024 * 1024 * 1024; /** * Max Result Size per query */ @ConfField(mutable = true) - public static long result_cache_size_per_query_in_bytes = 1024 * 1024; + public static long result_cache_ttl_size_per_query_in_bytes = 1024 * 1024; } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index 677e35f4875cfe..81e94cd0229402 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -85,13 +85,13 @@ public final class MetricRepo { public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS; // Metrics for the result cache - public static LongCounterMetric COUNTER_RESULT_CACHE_HITS; - public static LongCounterMetric COUNTER_RESULT_CACHE_MISSES; - public static GaugeMetric GAUGE_RESULT_CACHE_ENTRIES; - public static GaugeMetric GAUGE_RESULT_CACHE_SIZE_IN_BYTES; - public static LongCounterMetric COUNTER_RESULT_CACHE_EVICTIONS; - public static LongCounterMetric COUNTER_RESULT_CACHE_TIMEOUTS; - public static LongCounterMetric COUNTER_RESULT_CACHE_ERRORS; + public static LongCounterMetric COUNTER_RESULT_CACHE_TTL_HITS; + public static LongCounterMetric COUNTER_RESULT_CACHE_TTL_MISSES; + public static GaugeMetric GAUGE_RESULT_CACHE_TTL_ENTRIES; + public static GaugeMetric GAUGE_RESULT_CACHE_TTL_SIZE_IN_BYTES; + public static LongCounterMetric COUNTER_RESULT_CACHE_TTL_EVICTIONS; + public static LongCounterMetric COUNTER_RESULT_CACHE_TTL_TIMEOUTS; + public static LongCounterMetric COUNTER_RESULT_CACHE_TTL_ERRORS; public static Histogram HISTO_QUERY_LATENCY; public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY; @@ -278,20 +278,20 @@ public Long getValue() { "total error rows of routine load"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS); - COUNTER_RESULT_CACHE_HITS = new LongCounterMetric("result_cache_hits", MetricUnit.NOUNIT,"Accumulated number of cache hits"); - PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_HITS); - COUNTER_RESULT_CACHE_MISSES = new LongCounterMetric("result_cache_misses", MetricUnit.NOUNIT,"Accumulated number of cache misses"); - PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_MISSES); - GAUGE_RESULT_CACHE_ENTRIES = new GaugeMetric("result_cache_entries", MetricUnit.NOUNIT,"Accumulated number of cache size by entries"); - PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_RESULT_CACHE_ENTRIES); - GAUGE_RESULT_CACHE_SIZE_IN_BYTES = new GaugeMetric("result_cache_size_in_bytes", MetricUnit.BYTES,"Accumulated number of cache size by bytes"); - PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_RESULT_CACHE_SIZE_IN_BYTES); - COUNTER_RESULT_CACHE_EVICTIONS = new LongCounterMetric("result_cache_evictions", MetricUnit.NOUNIT,"Accumulated number of cache evictions"); - PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_EVICTIONS); - COUNTER_RESULT_CACHE_TIMEOUTS = new LongCounterMetric("result_cache_timeouts", MetricUnit.NOUNIT,"Accumulated number of cache timeouts"); - PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TIMEOUTS); - COUNTER_RESULT_CACHE_ERRORS= new LongCounterMetric("result_cache_errors", MetricUnit.NOUNIT,"Accumulated number of cache errors"); - PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_ERRORS); + COUNTER_RESULT_CACHE_TTL_HITS = new LongCounterMetric("result_cache_ttl_hits", MetricUnit.NOUNIT,"Accumulated number of cache hits"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TTL_HITS); + COUNTER_RESULT_CACHE_TTL_MISSES = new LongCounterMetric("result_cache_ttl_misses", MetricUnit.NOUNIT,"Accumulated number of cache misses"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TTL_MISSES); + GAUGE_RESULT_CACHE_TTL_ENTRIES = new GaugeMetric("result_cache_ttl_entries", MetricUnit.NOUNIT,"Accumulated number of cache size by entries"); + PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_RESULT_CACHE_TTL_ENTRIES); + GAUGE_RESULT_CACHE_TTL_SIZE_IN_BYTES = new GaugeMetric("result_cache_ttl_size_in_bytes", MetricUnit.BYTES,"Accumulated number of cache size by bytes"); + PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_RESULT_CACHE_TTL_SIZE_IN_BYTES); + COUNTER_RESULT_CACHE_TTL_EVICTIONS = new LongCounterMetric("result_cache_ttl_evictions", MetricUnit.NOUNIT,"Accumulated number of cache evictions"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TTL_EVICTIONS); + COUNTER_RESULT_CACHE_TTL_TIMEOUTS = new LongCounterMetric("result_cache_ttl_timeouts", MetricUnit.NOUNIT,"Accumulated number of cache timeouts"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TTL_TIMEOUTS); + COUNTER_RESULT_CACHE_TTL_ERRORS= new LongCounterMetric("result_cache_ttl_errors", MetricUnit.NOUNIT,"Accumulated number of cache errors"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_RESULT_CACHE_TTL_ERRORS); // 3. histogram HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms")); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 7fb3fbd938355c..5edcf48d1d3f60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -78,7 +78,7 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_SQL_CACHE = "enable_sql_cache"; public static final String ENABLE_PARTITION_CACHE = "enable_partition_cache"; - public static final String ENABLE_RESULT_CACHE = "enable_result_cache"; + public static final String ENABLE_RESULT_CACHE_TTL = "enable_result_cache_ttl"; public static final int MIN_EXEC_INSTANCE_NUM = 1; public static final int MAX_EXEC_INSTANCE_NUM = 32; @@ -234,7 +234,7 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PARTITION_CACHE) private boolean enablePartitionCache = false; - @VariableMgr.VarAttr(name = ENABLE_RESULT_CACHE) + @VariableMgr.VarAttr(name = ENABLE_RESULT_CACHE_TTL) private boolean enableResultCache = false; @VariableMgr.VarAttr(name = FORWARD_TO_MASTER)