diff --git a/session/bootstrap.go b/session/bootstrap.go index 14f1e21c4904a..4a0df857025d4 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -539,7 +539,7 @@ var ( func checkBootstrapped(s Session) (bool, error) { // Check if system db exists. - _, err := s.Execute(context.Background(), fmt.Sprintf("USE %s;", mysql.SystemDB)) + _, err := s.ExecuteInternal(context.Background(), "USE %n", mysql.SystemDB) if err != nil && infoschema.ErrDatabaseNotExists.NotEqual(err) { logutil.BgLogger().Fatal("check bootstrap error", zap.Error(err)) @@ -565,20 +565,21 @@ func checkBootstrapped(s Session) (bool, error) { // getTiDBVar gets variable value from mysql.tidb table. // Those variables are used by TiDB server. func getTiDBVar(s Session, name string) (sVal string, isNull bool, e error) { - sql := fmt.Sprintf(`SELECT HIGH_PRIORITY VARIABLE_VALUE FROM %s.%s WHERE VARIABLE_NAME="%s"`, - mysql.SystemDB, mysql.TiDBTable, name) ctx := context.Background() - rs, err := s.Execute(ctx, sql) + rs, err := s.ExecuteInternal(ctx, `SELECT HIGH_PRIORITY VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME= %?`, + mysql.SystemDB, + mysql.TiDBTable, + name, + ) if err != nil { return "", true, errors.Trace(err) } - if len(rs) != 1 { + if rs == nil { return "", true, errors.New("Wrong number of Recordset") } - r := rs[0] - defer terror.Call(r.Close) - req := r.NewChunk() - err = r.Next(ctx, req) + defer terror.Call(rs.Close) + req := rs.NewChunk() + err = rs.Next(ctx, req) if err != nil || req.NumRows() == 0 { return "", true, errors.Trace(err) } @@ -604,7 +605,7 @@ func upgrade(s Session) { } updateBootstrapVer(s) - _, err = s.Execute(context.Background(), "COMMIT") + _, err = s.ExecuteInternal(context.Background(), "COMMIT") if err != nil { sleepTime := 1 * time.Second @@ -651,9 +652,7 @@ func upgradeToVer3(s Session, ver int64) { return } // Version 3 fix tx_read_only variable value. - sql := fmt.Sprintf("UPDATE HIGH_PRIORITY %s.%s SET variable_value = '0' WHERE variable_name = 'tx_read_only';", - mysql.SystemDB, mysql.GlobalVariablesTable) - mustExecute(s, sql) + mustExecute(s, "UPDATE HIGH_PRIORITY %n.%n SET variable_value = '0' WHERE variable_name = 'tx_read_only';", mysql.SystemDB, mysql.GlobalVariablesTable) } // upgradeToVer4 updates to version 4. @@ -661,8 +660,7 @@ func upgradeToVer4(s Session, ver int64) { if ver >= version4 { return } - sql := CreateStatsMetaTable - mustExecute(s, sql) + mustExecute(s, CreateStatsMetaTable) } func upgradeToVer5(s Session, ver int64) { @@ -696,7 +694,7 @@ func upgradeToVer8(s Session, ver int64) { return } // This is a dummy upgrade, it checks whether upgradeToVer7 success, if not, do it again. - if _, err := s.Execute(context.Background(), "SELECT HIGH_PRIORITY `Process_priv` FROM mysql.user LIMIT 0"); err == nil { + if _, err := s.ExecuteInternal(context.Background(), "SELECT HIGH_PRIORITY `Process_priv` FROM mysql.user LIMIT 0"); err == nil { return } upgradeToVer7(s, ver) @@ -712,7 +710,7 @@ func upgradeToVer9(s Session, ver int64) { } func doReentrantDDL(s Session, sql string, ignorableErrs ...error) { - _, err := s.Execute(context.Background(), sql) + _, err := s.ExecuteInternal(context.Background(), sql) for _, ignorableErr := range ignorableErrs { if terror.ErrorEqual(err, ignorableErr) { return @@ -738,7 +736,7 @@ func upgradeToVer11(s Session, ver int64) { if ver >= version11 { return } - _, err := s.Execute(context.Background(), "ALTER TABLE mysql.user ADD COLUMN `References_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Grant_priv`") + _, err := s.ExecuteInternal(context.Background(), "ALTER TABLE mysql.user ADD COLUMN `References_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Grant_priv`") if err != nil { if terror.ErrorEqual(err, infoschema.ErrColumnExists) { return @@ -753,21 +751,20 @@ func upgradeToVer12(s Session, ver int64) { return } ctx := context.Background() - _, err := s.Execute(ctx, "BEGIN") + _, err := s.ExecuteInternal(ctx, "BEGIN") terror.MustNil(err) sql := "SELECT HIGH_PRIORITY user, host, password FROM mysql.user WHERE password != ''" - rs, err := s.Execute(ctx, sql) + rs, err := s.ExecuteInternal(ctx, sql) if terror.ErrorEqual(err, core.ErrUnknownColumn) { sql := "SELECT HIGH_PRIORITY user, host, authentication_string FROM mysql.user WHERE authentication_string != ''" - rs, err = s.Execute(ctx, sql) + rs, err = s.ExecuteInternal(ctx, sql) } terror.MustNil(err) - r := rs[0] sqls := make([]string, 0, 1) - defer terror.Call(r.Close) - req := r.NewChunk() + defer terror.Call(rs.Close) + req := rs.NewChunk() it := chunk.NewIterator4Chunk(req) - err = r.Next(ctx, req) + err = rs.Next(ctx, req) for err == nil && req.NumRows() != 0 { for row := it.Begin(); row != it.End(); row = it.Next() { user := row.GetString(0) @@ -779,7 +776,7 @@ func upgradeToVer12(s Session, ver int64) { updateSQL := fmt.Sprintf(`UPDATE HIGH_PRIORITY mysql.user SET password = "%s" WHERE user="%s" AND host="%s"`, newPass, user, host) sqls = append(sqls, updateSQL) } - err = r.Next(ctx, req) + err = rs.Next(ctx, req) } terror.MustNil(err) @@ -809,7 +806,7 @@ func upgradeToVer13(s Session, ver int64) { } ctx := context.Background() for _, sql := range sqls { - _, err := s.Execute(ctx, sql) + _, err := s.ExecuteInternal(ctx, sql) if err != nil { if terror.ErrorEqual(err, infoschema.ErrColumnExists) { continue @@ -838,7 +835,7 @@ func upgradeToVer14(s Session, ver int64) { } ctx := context.Background() for _, sql := range sqls { - _, err := s.Execute(ctx, sql) + _, err := s.ExecuteInternal(ctx, sql) if err != nil { if terror.ErrorEqual(err, infoschema.ErrColumnExists) { continue @@ -853,7 +850,7 @@ func upgradeToVer15(s Session, ver int64) { return } var err error - _, err = s.Execute(context.Background(), CreateGCDeleteRangeTable) + _, err = s.ExecuteInternal(context.Background(), CreateGCDeleteRangeTable) if err != nil { logutil.BgLogger().Fatal("upgradeToVer15 error", zap.Error(err)) } @@ -923,9 +920,13 @@ func upgradeToVer23(s Session, ver int64) { // writeSystemTZ writes system timezone info into mysql.tidb func writeSystemTZ(s Session) { - sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", "%s", "TiDB Global System Timezone.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%s"`, - mysql.SystemDB, mysql.TiDBTable, tidbSystemTZ, timeutil.InferSystemTZ(), timeutil.InferSystemTZ()) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB Global System Timezone.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, + mysql.SystemDB, + mysql.TiDBTable, + tidbSystemTZ, + timeutil.InferSystemTZ(), + timeutil.InferSystemTZ(), + ) } // upgradeToVer24 initializes `System` timezone according to docs/design/2018-09-10-adding-tz-env.md @@ -1054,7 +1055,7 @@ func upgradeToVer38(s Session, ver int64) { return } var err error - _, err = s.Execute(context.Background(), CreateGlobalPrivTable) + _, err = s.ExecuteInternal(context.Background(), CreateGlobalPrivTable) if err != nil { logutil.BgLogger().Fatal("upgradeToVer38 error", zap.Error(err)) } @@ -1066,9 +1067,9 @@ func writeNewCollationParameter(s Session, flag bool) { if flag { b = varTrue } - sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%s', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%s'`, - mysql.SystemDB, mysql.TiDBTable, tidbNewCollationEnabled, b, comment, b) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`, + mysql.SystemDB, mysql.TiDBTable, tidbNewCollationEnabled, b, comment, b, + ) } func upgradeToVer40(s Session, ver int64) { @@ -1104,14 +1105,14 @@ func upgradeToVer42(s Session, ver int64) { // Convert statement summary global variables to non-empty values. func writeStmtSummaryVars(s Session) { - sql := fmt.Sprintf("UPDATE %s.%s SET variable_value='%%s' WHERE variable_name='%%s' AND variable_value=''", mysql.SystemDB, mysql.GlobalVariablesTable) + sql := "UPDATE %n.%n SET variable_value= %? WHERE variable_name= %? AND variable_value=''" stmtSummaryConfig := config.GetGlobalConfig().StmtSummary - mustExecute(s, fmt.Sprintf(sql, variable.BoolToOnOff(stmtSummaryConfig.Enable), variable.TiDBEnableStmtSummary)) - mustExecute(s, fmt.Sprintf(sql, variable.BoolToOnOff(stmtSummaryConfig.EnableInternalQuery), variable.TiDBStmtSummaryInternalQuery)) - mustExecute(s, fmt.Sprintf(sql, strconv.Itoa(stmtSummaryConfig.RefreshInterval), variable.TiDBStmtSummaryRefreshInterval)) - mustExecute(s, fmt.Sprintf(sql, strconv.Itoa(stmtSummaryConfig.HistorySize), variable.TiDBStmtSummaryHistorySize)) - mustExecute(s, fmt.Sprintf(sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxStmtCount), 10), variable.TiDBStmtSummaryMaxStmtCount)) - mustExecute(s, fmt.Sprintf(sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxSQLLength), 10), variable.TiDBStmtSummaryMaxSQLLength)) + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, variable.BoolToOnOff(stmtSummaryConfig.Enable), variable.TiDBEnableStmtSummary) + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, variable.BoolToOnOff(stmtSummaryConfig.EnableInternalQuery), variable.TiDBStmtSummaryInternalQuery) + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.Itoa(stmtSummaryConfig.RefreshInterval), variable.TiDBStmtSummaryRefreshInterval) + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.Itoa(stmtSummaryConfig.HistorySize), variable.TiDBStmtSummaryHistorySize) + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.FormatUint(uint64(stmtSummaryConfig.MaxStmtCount), 10), variable.TiDBStmtSummaryMaxStmtCount) + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.FormatUint(uint64(stmtSummaryConfig.MaxSQLLength), 10), variable.TiDBStmtSummaryMaxSQLLength) } func upgradeToVer43(s Session, ver int64) { @@ -1219,13 +1220,12 @@ func upgradeToVer55(s Session, ver int64) { selectSQL := "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(names, quoteCommaQuote) + "')" ctx := context.Background() - rs, err := s.Execute(ctx, selectSQL) + rs, err := s.ExecuteInternal(ctx, selectSQL) terror.MustNil(err) - r := rs[0] - defer terror.Call(r.Close) - req := r.NewChunk() + defer terror.Call(rs.Close) + req := rs.NewChunk() it := chunk.NewIterator4Chunk(req) - err = r.Next(ctx, req) + err = rs.Next(ctx, req) for err == nil && req.NumRows() != 0 { for row := it.Begin(); row != it.End(); row = it.Next() { n := strings.ToLower(row.GetString(0)) @@ -1234,7 +1234,7 @@ func upgradeToVer55(s Session, ver int64) { return } } - err = r.Next(ctx, req) + err = rs.Next(ctx, req) } terror.MustNil(err) @@ -1270,9 +1270,9 @@ func initBindInfoTable(s Session) { } func insertBuiltinBindInfoRow(s Session) { - sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO mysql.bind_info VALUES ("%s", "%s", "mysql", "%s", "0000-00-00 00:00:00", "0000-00-00 00:00:00", "", "", "%s")`, - bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.Builtin, bindinfo.Builtin) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO mysql.bind_info VALUES (%?, %?, "mysql", %?, "0000-00-00 00:00:00", "0000-00-00 00:00:00", "", "", %?)`, + bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.Builtin, bindinfo.Builtin, + ) } func upgradeToVer59(s Session, ver int64) { @@ -1400,9 +1400,9 @@ func updateBindInfo(iter *chunk.Iterator4Chunk, p *parser.Parser, bindMap map[st func writeMemoryQuotaQuery(s Session) { comment := "memory_quota_query is 32GB by default in v3.0.x, 1GB by default in v4.0.x+" - sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%d', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%d'`, - mysql.SystemDB, mysql.TiDBTable, tidbDefMemoryQuotaQuery, 32<<30, comment, 32<<30) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`, + mysql.SystemDB, mysql.TiDBTable, tidbDefMemoryQuotaQuery, 32<<30, comment, 32<<30, + ) } func upgradeToVer62(s Session, ver int64) { @@ -1431,17 +1431,17 @@ func upgradeToVer64(s Session, ver int64) { func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" - sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%s', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%s'`, - mysql.SystemDB, mysql.TiDBTable, tidbDefOOMAction, config.OOMActionLog, comment, config.OOMActionLog) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, + mysql.SystemDB, mysql.TiDBTable, tidbDefOOMAction, config.OOMActionLog, comment, config.OOMActionLog, + ) } // updateBootstrapVer updates bootstrap version variable in mysql.TiDB table. func updateBootstrapVer(s Session) { // Update bootstrap version. - sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", "%d", "TiDB bootstrap version.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%d"`, - mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, currentBootstrapVersion) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB bootstrap version.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`, + mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, currentBootstrapVersion, + ) } // getBootstrapVersion gets bootstrap version from mysql.tidb table; @@ -1461,7 +1461,7 @@ func doDDLWorks(s Session) { // Create a test database. mustExecute(s, "CREATE DATABASE IF NOT EXISTS test") // Create system db. - mustExecute(s, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", mysql.SystemDB)) + mustExecute(s, "CREATE DATABASE IF NOT EXISTS %n", mysql.SystemDB) // Create user table. mustExecute(s, CreateUserTable) // Create privilege tables. @@ -1507,6 +1507,7 @@ func doDDLWorks(s Session) { // doDMLWorks executes DML statements in bootstrap stage. // All the statements run in a single transaction. +// TODO: sanitize. func doDMLWorks(s Session) { mustExecute(s, "BEGIN") @@ -1548,14 +1549,13 @@ func doDMLWorks(s Session) { strings.Join(values, ", ")) mustExecute(s, sql) - sql = fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES("%s", "%s", "Bootstrap flag. Do not delete.") - ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%s"`, - mysql.SystemDB, mysql.TiDBTable, bootstrappedVar, varTrue, varTrue) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES(%?, %?, "Bootstrap flag. Do not delete.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`, + mysql.SystemDB, mysql.TiDBTable, bootstrappedVar, varTrue, varTrue, + ) - sql = fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES("%s", "%d", "Bootstrap version. Do not delete.")`, - mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES(%?, %?, "Bootstrap version. Do not delete.")`, + mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, + ) writeSystemTZ(s) @@ -1565,7 +1565,7 @@ func doDMLWorks(s Session) { writeStmtSummaryVars(s) - _, err := s.Execute(context.Background(), "COMMIT") + _, err := s.ExecuteInternal(context.Background(), "COMMIT") if err != nil { sleepTime := 1 * time.Second logutil.BgLogger().Info("doDMLWorks failed", zap.Error(err), zap.Duration("sleeping time", sleepTime)) @@ -1582,8 +1582,8 @@ func doDMLWorks(s Session) { } } -func mustExecute(s Session, sql string) { - _, err := s.ExecuteInternal(context.Background(), sql) +func mustExecute(s Session, sql string, args ...interface{}) { + _, err := s.ExecuteInternal(context.Background(), sql, args...) if err != nil { debug.PrintStack() logutil.BgLogger().Fatal("mustExecute error", zap.Error(err)) diff --git a/session/session.go b/session/session.go index 851fd4c2a5ead..973d6373e338c 100644 --- a/session/session.go +++ b/session/session.go @@ -99,15 +99,6 @@ var ( tiKVGCAutoConcurrency = "tikv_gc_auto_concurrency" ) -var gcVariableComments = map[string]string{ - variable.TiDBGCRunInterval: "GC run interval, at least 10m, in Go format.", - variable.TiDBGCLifetime: "All versions within life time will not be collected by GC, at least 10m, in Go format.", - variable.TiDBGCConcurrency: "How many goroutines used to do GC parallel, [1, 128], default 2", - variable.TiDBGCEnable: "Current GC enable status", - tiKVGCAutoConcurrency: "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", - variable.TiDBGCScanLockMode: "Mode of scanning locks, \"physical\" or \"legacy\"", -} - var gcVariableMap = map[string]string{ variable.TiDBGCRunInterval: "tikv_gc_run_interval", variable.TiDBGCLifetime: "tikv_gc_life_time", @@ -995,15 +986,19 @@ func drainRecordSet(ctx context.Context, se *session, rs sqlexec.RecordSet) ([]c } } -// getExecRet executes restricted sql and the result is one column. +// getTableValue executes restricted sql and the result is one column. // It returns a string value. -func (s *session) getExecRet(ctx sessionctx.Context, sql string) (string, error) { - rows, fields, err := s.ExecRestrictedSQL(sql) +func (s *session) getTableValue(ctx context.Context, tblName string, varName string) (string, error) { + stmt, err := s.ParseWithParams(ctx, "SELECT VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME=%?", mysql.SystemDB, tblName, varName) + if err != nil { + return "", err + } + rows, fields, err := s.ExecRestrictedStmt(ctx, stmt) if err != nil { return "", err } if len(rows) == 0 { - return "", executor.ErrResultIsEmpty + return "", errResultIsEmpty } d := rows[0].GetDatum(0, &fields[0].Column.FieldType) value, err := d.ToString() @@ -1013,6 +1008,34 @@ func (s *session) getExecRet(ctx sessionctx.Context, sql string) (string, error) return value, nil } +var gcVariableComments = map[string]string{ + variable.TiDBGCRunInterval: "GC run interval, at least 10m, in Go format.", + variable.TiDBGCLifetime: "All versions within life time will not be collected by GC, at least 10m, in Go format.", + variable.TiDBGCConcurrency: "How many goroutines used to do GC parallel, [1, 128], default 2", + variable.TiDBGCEnable: "Current GC enable status", + tiKVGCAutoConcurrency: "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", + variable.TiDBGCScanLockMode: "Mode of scanning locks, \"physical\" or \"legacy\"", +} + +// replaceTableValue executes restricted sql updates the variable value +func (s *session) replaceTableValue(ctx context.Context, tblName string, varName, val string) error { + if tblName == mysql.TiDBTable { // maintain comment metadata + comment := gcVariableComments[varName] + stmt, err := s.ParseWithParams(ctx, `REPLACE INTO %n.%n (variable_name, variable_value, comment) VALUES (%?, %?, %?)`, mysql.SystemDB, tblName, varName, val, comment) + if err != nil { + return err + } + _, _, err = s.ExecRestrictedStmt(ctx, stmt) + return err + } + stmt, err := s.ParseWithParams(ctx, `REPLACE INTO %n.%n (variable_name, variable_value) VALUES (%?, %?)`, mysql.SystemDB, tblName, varName, val) + if err != nil { + return err + } + _, _, err = s.ExecRestrictedStmt(ctx, stmt) + return err +} + func (s *session) varFromTiDBTable(name string) bool { switch name { case variable.TiDBGCConcurrency, variable.TiDBGCEnable, variable.TiDBGCRunInterval, variable.TiDBGCLifetime, variable.TiDBGCScanLockMode: @@ -1030,11 +1053,9 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { // When running bootstrap or upgrade, we should not access global storage. return "", nil } - sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM %s.%s WHERE VARIABLE_NAME="%s";`, - mysql.SystemDB, mysql.GlobalVariablesTable, name) - sysVar, err := s.getExecRet(s, sql) + sysVar, err := s.getTableValue(context.TODO(), mysql.GlobalVariablesTable, name) if err != nil { - if executor.ErrResultIsEmpty.Equal(err) { + if errResultIsEmpty.Equal(err) { sv := variable.GetSysVar(name) if sv != nil { return sv.Value, nil @@ -1081,18 +1102,14 @@ func (s *session) SetGlobalSysVar(name, value string) error { } } variable.CheckDeprecationSetSystemVar(s.sessionVars, name) - sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, - mysql.SystemDB, mysql.GlobalVariablesTable, name, escapeUserString(sVal)) - _, _, err = s.ExecRestrictedSQL(sql) + stmt, err := s.ParseWithParams(context.TODO(), "REPLACE %n.%n VALUES (%?, %?)", mysql.SystemDB, mysql.GlobalVariablesTable, name, sVal) + if err != nil { + return err + } + _, _, err = s.ExecRestrictedStmt(context.TODO(), stmt) return err } -// escape user supplied string for internal SQL. Not safe for all cases, since it doesn't -// handle quote-type, sql-mode, character set breakout. -func escapeUserString(str string) string { - return strings.ReplaceAll(str, `'`, `\'`) -} - // setTiDBTableValue handles tikv_* sysvars which need to update mysql.tidb // for backwards compatibility. Validation has already been performed. func (s *session) setTiDBTableValue(name, val string) error { @@ -1101,17 +1118,13 @@ func (s *session) setTiDBTableValue(name, val string) error { if val == "-1" { autoConcurrency = "true" } - sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%[1]s', '%[2]s', '%[3]s') - ON DUPLICATE KEY UPDATE variable_value = '%[2]s'`, tiKVGCAutoConcurrency, autoConcurrency, gcVariableComments[name]) - _, _, err := s.ExecRestrictedSQL(sql) + err := s.replaceTableValue(context.TODO(), mysql.TiDBTable, tiKVGCAutoConcurrency, autoConcurrency) if err != nil { return err } } val = onOffToTrueFalse(val) - sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%[1]s', '%[2]s', '%[3]s') - ON DUPLICATE KEY UPDATE variable_value = '%[2]s'`, gcVariableMap[name], escapeUserString(val), gcVariableComments[name]) - _, _, err := s.ExecRestrictedSQL(sql) + err := s.replaceTableValue(context.TODO(), mysql.TiDBTable, gcVariableMap[name], val) return err } @@ -1142,14 +1155,12 @@ func onOffToTrueFalse(str string) string { func (s *session) getTiDBTableValue(name, val string) (string, error) { if name == variable.TiDBGCConcurrency { // Check if autoconcurrency is set - sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, tiKVGCAutoConcurrency) - autoConcurrencyVal, err := s.getExecRet(s, sql) + autoConcurrencyVal, err := s.getTableValue(context.TODO(), mysql.TiDBTable, tiKVGCAutoConcurrency) if err == nil && strings.EqualFold(autoConcurrencyVal, "true") { return "-1", nil // convention for "AUTO" } } - sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, gcVariableMap[name]) - tblValue, err := s.getExecRet(s, sql) + tblValue, err := s.getTableValue(context.TODO(), mysql.TiDBTable, gcVariableMap[name]) if err != nil { return val, nil // mysql.tidb value does not exist. } @@ -1164,24 +1175,24 @@ func (s *session) getTiDBTableValue(name, val string) (string, error) { zap.String("tblName", gcVariableMap[name]), zap.String("tblValue", tblValue), zap.String("restoredValue", val)) - sql := fmt.Sprintf(`REPLACE INTO mysql.tidb (variable_name, variable_value, comment) - VALUES ('%s', '%s', '%s')`, gcVariableMap[name], escapeUserString(val), gcVariableComments[name]) - _, _, err = s.ExecRestrictedSQL(sql) + err = s.replaceTableValue(context.TODO(), mysql.TiDBTable, gcVariableMap[name], val) return val, err } if validatedVal != val { // The sysvar value is out of sync. - sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, - mysql.SystemDB, mysql.GlobalVariablesTable, gcVariableMap[name], escapeUserString(validatedVal)) - _, _, err = s.ExecRestrictedSQL(sql) + err = s.replaceTableValue(context.TODO(), mysql.GlobalVariablesTable, gcVariableMap[name], validatedVal) return validatedVal, err } return validatedVal, nil } func (s *session) ensureFullGlobalStats() error { - rows, _, err := s.ExecRestrictedSQL(`select count(1) from information_schema.tables t where t.create_options = 'partitioned' - and not exists (select 1 from mysql.stats_meta m where m.table_id = t.tidb_table_id)`) + stmt, err := s.ParseWithParams(context.TODO(), `select count(1) from information_schema.tables t where t.create_options = 'partitioned' + and not exists (select 1 from mysql.stats_meta m where m.table_id = t.tidb_table_id)`) + if err != nil { + return err + } + rows, _, err := s.ExecRestrictedStmt(context.TODO(), stmt) if err != nil { return err } @@ -2175,11 +2186,6 @@ func CreateSessionWithOpt(store kv.Storage, opt *Opt) (Session, error) { return s, nil } -// loadSystemTZ loads systemTZ from mysql.tidb -func loadSystemTZ(se *session) (string, error) { - return loadParameter(se, "system_tz") -} - // loadCollationParameter loads collation parameter from mysql.tidb func loadCollationParameter(se *session) (bool, error) { para, err := loadParameter(se, tidbNewCollationEnabled) @@ -2235,25 +2241,7 @@ var ( // loadParameter loads read-only parameter from mysql.tidb func loadParameter(se *session, name string) (string, error) { - sql := "select variable_value from mysql.tidb where variable_name = '" + name + "'" - rs, errLoad := se.ExecuteInternal(context.Background(), sql) - if errLoad != nil { - return "", errLoad - } - // the record of mysql.tidb under where condition: variable_name = $name should shall only be one. - defer func() { - if err := rs.Close(); err != nil { - logutil.BgLogger().Error("close result set error", zap.Error(err)) - } - }() - req := rs.NewChunk() - if err := rs.Next(context.Background(), req); err != nil { - return "", err - } - if req.NumRows() == 0 { - return "", errResultIsEmpty - } - return req.GetRow(0).GetString(0), nil + return se.getTableValue(context.TODO(), mysql.TiDBTable, name) } // BootstrapSession runs the first time when the TiDB server start. @@ -2270,8 +2258,6 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { } } - initLoadCommonGlobalVarsSQL() - ver := getStoreBootstrapVersion(store) if ver == notBootstrapped { runInBootstrapSession(store, bootstrap) @@ -2283,8 +2269,9 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { if err != nil { return nil, err } + // get system tz from mysql.tidb - tz, err := loadSystemTZ(se) + tz, err := se.getTableValue(context.TODO(), mysql.TiDBTable, "system_tz") if err != nil { return nil, err } @@ -2649,24 +2636,8 @@ var builtinGlobalVariable = []string{ variable.TiDBEnableTiFlashFallbackTiKV, } -var ( - loadCommonGlobalVarsSQLOnce sync.Once - loadCommonGlobalVarsSQL string -) - -func initLoadCommonGlobalVarsSQL() { - loadCommonGlobalVarsSQLOnce.Do(func() { - vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...) - if len(variable.PluginVarNames) > 0 { - vars = append(vars, variable.PluginVarNames...) - } - loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(vars, quoteCommaQuote) + "')" - }) -} - // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. func (s *session) loadCommonGlobalVariablesIfNeeded() error { - initLoadCommonGlobalVarsSQL() vars := s.sessionVars if vars.CommonGlobalLoaded { return nil @@ -2681,7 +2652,17 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { // When a lot of connections connect to TiDB simultaneously, it can protect TiKV meta region from overload. gvc := domain.GetDomain(s).GetGlobalVarsCache() loadFunc := func() ([]chunk.Row, []*ast.ResultField, error) { - return s.ExecRestrictedSQL(loadCommonGlobalVarsSQL) + vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...) + if len(variable.PluginVarNames) > 0 { + vars = append(vars, variable.PluginVarNames...) + } + + stmt, err := s.ParseWithParams(context.TODO(), "select HIGH_PRIORITY * from mysql.global_variables where variable_name in (%?)", vars) + if err != nil { + return nil, nil, errors.Trace(err) + } + + return s.ExecRestrictedStmt(context.TODO(), stmt) } rows, fields, err := gvc.LoadGlobalVariables(loadFunc) if err != nil { diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 815508a10f35a..772553fc840cc 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -267,12 +267,12 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { // UpdateSessionVar updates the necessary session variables for the stats reader. func (h *Handle) UpdateSessionVar() error { + h.mu.Lock() + defer h.mu.Unlock() verInString, err := h.mu.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion) if err != nil { return err } - h.mu.Lock() - defer h.mu.Unlock() ver, err := strconv.ParseInt(verInString, 10, 64) if err != nil { return err diff --git a/util/testkit/testkit.go b/util/testkit/testkit.go index 1b52f78549678..db16779fdbb8a 100644 --- a/util/testkit/testkit.go +++ b/util/testkit/testkit.go @@ -147,7 +147,7 @@ func (tk *TestKit) GetConnectionID() { } } -// Exec executes a sql statement. +// Exec executes a sql statement using the prepared stmt API func (tk *TestKit) Exec(sql string, args ...interface{}) (sqlexec.RecordSet, error) { var err error if tk.Se == nil {