Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, lo
* tryAddFeCache
*/
public void tryAddFeSqlCache(ConnectContext connectContext, String sql) {
switch (connectContext.getCommand()) {
case COM_STMT_EXECUTE:
case COM_STMT_PREPARE:
return;
default: { }
}

Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
if (!sqlCacheContextOpt.isPresent()) {
return;
Expand All @@ -149,6 +156,12 @@ public void tryAddFeSqlCache(ConnectContext connectContext, String sql) {
* tryAddBeCache
*/
public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyzer analyzer) {
switch (connectContext.getCommand()) {
case COM_STMT_EXECUTE:
case COM_STMT_PREPARE:
return;
default: { }
}
Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
if (!sqlCacheContextOpt.isPresent()) {
return;
Expand Down Expand Up @@ -180,6 +193,12 @@ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyz
* tryParseSql
*/
public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, String sql) {
switch (connectContext.getCommand()) {
case COM_STMT_EXECUTE:
case COM_STMT_PREPARE:
return Optional.empty();
default: { }
}
String key = generateCacheKey(connectContext, normalizeSql(sql.trim()));
SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key);
if (sqlCacheContext == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public enum TableFrom {
private final Stack<CloseableResource> plannerResources = new Stack<>();

// placeholder params for prepared statement
private List<Placeholder> placeholders;
private List<Placeholder> placeholders = new ArrayList<>();

// all tables in query
private boolean needLockTables = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ public LogicalPlan visitCreateTable(DorisParser.CreateTableContext ctx) {
if (ctx.propertyClause() != null) {
List<DorisParser.PropertyClauseContext> propertyClauseContexts = ctx.propertyClause();
for (DorisParser.PropertyClauseContext propertyClauseContext : propertyClauseContexts) {
encryptProperty(visitPropertyClause(propertyClauseContext),
propertyClauseContext.fileProperties.start.getStartIndex(),
propertyClauseContext.fileProperties.stop.getStopIndex());
if (propertyClauseContext != null) {
encryptProperty(visitPropertyClause(propertyClauseContext),
propertyClauseContext.fileProperties.start.getStartIndex(),
propertyClauseContext.fileProperties.stop.getStopIndex());
}
}
}
return super.visitCreateTable(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -68,9 +70,13 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
throw new AnalysisException(
"prepare statement " + stmtName + " not found, maybe expired");
}
PrepareCommand prepareCommand = (PrepareCommand) preparedStmtCtx.command;
LogicalPlanAdapter planAdapter = new LogicalPlanAdapter(prepareCommand.getLogicalPlan(), executor.getContext()
.getStatementContext());
PrepareCommand prepareCommand = preparedStmtCtx.command;
LogicalPlan logicalPlan = prepareCommand.getLogicalPlan();
if (logicalPlan instanceof LogicalSqlCache) {
throw new AnalysisException("Unsupported sql cache for server prepared statement");
}
LogicalPlanAdapter planAdapter = new LogicalPlanAdapter(
logicalPlan, executor.getContext().getStatementContext());
executor.setParsedStmt(planAdapter);
// If it's not a short circuit query or schema version is different(indicates schema changed) or
// has nondeterministic functions in statement, then need to do reanalyze and plan
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,17 @@ public void executeQuery(String originStmt) throws Exception {
private List<StatementBase> parseFromSqlCache(String originStmt) {
StatementContext statementContext = new StatementContext(ctx, new OriginStatement(originStmt, 0));
ctx.setStatementContext(statementContext);

// the mysql protocol has different between COM_QUERY and COM_STMT_EXECUTE,
// the sql cache use the result of COM_QUERY, so we can not provide the
// result of sql cache for COM_STMT_EXECUTE/COM_STMT_PREPARE
switch (ctx.getCommand()) {
case COM_STMT_EXECUTE:
case COM_STMT_PREPARE:
return null;
default: { }
}

try {
Optional<Pair<ExplainOptions, String>> explainPlan = NereidsParser.tryParseExplainPlan(originStmt);
String cacheSqlKey = originStmt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TestAction implements SuiteAction {
} else {
if (exception != null || result.exception != null) {
def msg = result.exception?.toString()
log.info("Exception: ${msg}")
log.error("Exception: ${msg}", exception != null ? exception : result.exception)
Assert.assertTrue("Expect exception msg contains '${exception}', but meet '${msg}'",
msg != null && exception != null && msg.contains(exception))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,7 @@ suite("parse_sql_from_sql_cache") {
}

def dbName = (sql "select database()")[0][0].toString()
foreachFrontends { fe ->
def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/${dbName}"
connect(context.config.jdbcUser, context.config.jdbcPassword, url) {
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
}
}
sql "ADMIN SET ALL FRONTENDS CONFIG ('cache_last_version_interval_second' = '10')"

// make sure if the table has been dropped, the cache should invalidate,
// so we should retry multiple times to check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,10 @@
// specific language governing permissions and limitations
// under the License.

import com.mysql.cj.ServerPreparedQuery
import com.mysql.cj.jdbc.ConnectionImpl
import com.mysql.cj.jdbc.JdbcStatement
import com.mysql.cj.jdbc.ServerPreparedStatement
import com.mysql.cj.jdbc.StatementImpl
import org.apache.doris.regression.util.JdbcUtils

import java.lang.reflect.Field
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.util.concurrent.CopyOnWriteArrayList

suite("prepare_stmt_with_sql_cache") {

Expand All @@ -38,11 +31,13 @@ suite("prepare_stmt_with_sql_cache") {
insert into test_prepare_stmt_with_sql_cache select * from numbers('number'='100');
"""

sql "ADMIN SET ALL FRONTENDS CONFIG ('cache_last_version_interval_second' = '10')"

def db = (sql "select database()")[0][0].toString()

def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
def serverPrepareUrl = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)

connect(context.config.jdbcUser, context.config.jdbcPassword, url) {
connect(context.config.jdbcUser, context.config.jdbcPassword, serverPrepareUrl) {
sql "set enable_sql_cache=true"
for (def i in 0..<10) {
try (PreparedStatement pstmt = prepareStatement("select * from test_prepare_stmt_with_sql_cache where id=?")) {
Expand All @@ -54,4 +49,24 @@ suite("prepare_stmt_with_sql_cache") {
}
}
}

sleep(10 * 1000)

connect(context.config.jdbcUser, context.config.jdbcPassword, context.config.jdbcUrl) {
sql "use ${db}"
sql "set enable_sql_cache=true"
test {
sql "select * from test_prepare_stmt_with_sql_cache where id=10"
result([[10]])
}
}

connect(context.config.jdbcUser, context.config.jdbcPassword, serverPrepareUrl) {
sql "use ${db}"
sql "set enable_sql_cache=true"
test {
sql "select * from test_prepare_stmt_with_sql_cache where id=10"
result(([[10]]))
}
}
}
Loading