From dff89fd6e38c973fb5ca521fa2938b2154f0f6d5 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Thu, 18 Feb 2021 22:10:30 -0700 Subject: [PATCH 1/3] cherry pick #22652 to release-4.0 Signed-off-by: ti-srebot --- session/bootstrap.go | 228 ++++++++++++++++++++++++++++-------- session/session.go | 222 ++++++++++++++++++++++++++++++----- statistics/handle/handle.go | 141 ++++++++++++++++++++++ util/testkit/testkit.go | 2 +- 4 files changed, 513 insertions(+), 80 deletions(-) diff --git a/session/bootstrap.go b/session/bootstrap.go index b85bb79e7de13..d23fe7644df93 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -465,7 +465,7 @@ var ( func checkBootstrapped(s Session) (bool, error) { // Check if system db exists. - _, err := s.Execute(context.Background(), fmt.Sprintf("USE %s;", mysql.SystemDB)) + _, err := s.ExecuteInternal(context.Background(), "USE %n", mysql.SystemDB) if err != nil && infoschema.ErrDatabaseNotExists.NotEqual(err) { logutil.BgLogger().Fatal("check bootstrap error", zap.Error(err)) @@ -491,20 +491,21 @@ func checkBootstrapped(s Session) (bool, error) { // getTiDBVar gets variable value from mysql.tidb table. // Those variables are used by TiDB server. func getTiDBVar(s Session, name string) (sVal string, isNull bool, e error) { - sql := fmt.Sprintf(`SELECT HIGH_PRIORITY VARIABLE_VALUE FROM %s.%s WHERE VARIABLE_NAME="%s"`, - mysql.SystemDB, mysql.TiDBTable, name) ctx := context.Background() - rs, err := s.Execute(ctx, sql) + rs, err := s.ExecuteInternal(ctx, `SELECT HIGH_PRIORITY VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME= %?`, + mysql.SystemDB, + mysql.TiDBTable, + name, + ) if err != nil { return "", true, errors.Trace(err) } - if len(rs) != 1 { + if rs == nil { return "", true, errors.New("Wrong number of Recordset") } - r := rs[0] - defer terror.Call(r.Close) - req := r.NewChunk() - err = r.Next(ctx, req) + defer terror.Call(rs.Close) + req := rs.NewChunk() + err = rs.Next(ctx, req) if err != nil || req.NumRows() == 0 { return "", true, errors.Trace(err) } @@ -530,7 +531,7 @@ func upgrade(s Session) { } updateBootstrapVer(s) - _, err = s.Execute(context.Background(), "COMMIT") + _, err = s.ExecuteInternal(context.Background(), "COMMIT") if err != nil { sleepTime := 1 * time.Second @@ -577,9 +578,13 @@ func upgradeToVer3(s Session, ver int64) { return } // Version 3 fix tx_read_only variable value. +<<<<<<< HEAD sql := fmt.Sprintf("UPDATE HIGH_PRIORITY %s.%s set variable_value = '0' where variable_name = 'tx_read_only';", mysql.SystemDB, mysql.GlobalVariablesTable) mustExecute(s, sql) +======= + mustExecute(s, "UPDATE HIGH_PRIORITY %n.%n SET variable_value = '0' WHERE variable_name = 'tx_read_only';", mysql.SystemDB, mysql.GlobalVariablesTable) +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) } // upgradeToVer4 updates to version 4. @@ -587,8 +592,7 @@ func upgradeToVer4(s Session, ver int64) { if ver >= version4 { return } - sql := CreateStatsMetaTable - mustExecute(s, sql) + mustExecute(s, CreateStatsMetaTable) } func upgradeToVer5(s Session, ver int64) { @@ -622,7 +626,11 @@ func upgradeToVer8(s Session, ver int64) { return } // This is a dummy upgrade, it checks whether upgradeToVer7 success, if not, do it again. +<<<<<<< HEAD if _, err := s.Execute(context.Background(), "SELECT HIGH_PRIORITY `Process_priv` from mysql.user limit 0"); err == nil { +======= + if _, err := s.ExecuteInternal(context.Background(), "SELECT HIGH_PRIORITY `Process_priv` FROM mysql.user LIMIT 0"); err == nil { +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) return } upgradeToVer7(s, ver) @@ -638,7 +646,7 @@ func upgradeToVer9(s Session, ver int64) { } func doReentrantDDL(s Session, sql string, ignorableErrs ...error) { - _, err := s.Execute(context.Background(), sql) + _, err := s.ExecuteInternal(context.Background(), sql) for _, ignorableErr := range ignorableErrs { if terror.ErrorEqual(err, ignorableErr) { return @@ -664,7 +672,11 @@ func upgradeToVer11(s Session, ver int64) { if ver >= version11 { return } +<<<<<<< HEAD _, err := s.Execute(context.Background(), "ALTER TABLE mysql.user ADD COLUMN `References_priv` enum('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Grant_priv`") +======= + _, err := s.ExecuteInternal(context.Background(), "ALTER TABLE mysql.user ADD COLUMN `References_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Grant_priv`") +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) if err != nil { if terror.ErrorEqual(err, infoschema.ErrColumnExists) { return @@ -679,21 +691,20 @@ func upgradeToVer12(s Session, ver int64) { return } ctx := context.Background() - _, err := s.Execute(ctx, "BEGIN") + _, err := s.ExecuteInternal(ctx, "BEGIN") terror.MustNil(err) sql := "SELECT HIGH_PRIORITY user, host, password FROM mysql.user WHERE password != ''" - rs, err := s.Execute(ctx, sql) + rs, err := s.ExecuteInternal(ctx, sql) if terror.ErrorEqual(err, core.ErrUnknownColumn) { sql := "SELECT HIGH_PRIORITY user, host, authentication_string FROM mysql.user WHERE authentication_string != ''" - rs, err = s.Execute(ctx, sql) + rs, err = s.ExecuteInternal(ctx, sql) } terror.MustNil(err) - r := rs[0] sqls := make([]string, 0, 1) - defer terror.Call(r.Close) - req := r.NewChunk() + defer terror.Call(rs.Close) + req := rs.NewChunk() it := chunk.NewIterator4Chunk(req) - err = r.Next(ctx, req) + err = rs.Next(ctx, req) for err == nil && req.NumRows() != 0 { for row := it.Begin(); row != it.End(); row = it.Next() { user := row.GetString(0) @@ -705,7 +716,7 @@ func upgradeToVer12(s Session, ver int64) { updateSQL := fmt.Sprintf(`UPDATE HIGH_PRIORITY mysql.user set password = "%s" where user="%s" and host="%s"`, newPass, user, host) sqls = append(sqls, updateSQL) } - err = r.Next(ctx, req) + err = rs.Next(ctx, req) } terror.MustNil(err) @@ -735,7 +746,7 @@ func upgradeToVer13(s Session, ver int64) { } ctx := context.Background() for _, sql := range sqls { - _, err := s.Execute(ctx, sql) + _, err := s.ExecuteInternal(ctx, sql) if err != nil { if terror.ErrorEqual(err, infoschema.ErrColumnExists) { continue @@ -764,7 +775,7 @@ func upgradeToVer14(s Session, ver int64) { } ctx := context.Background() for _, sql := range sqls { - _, err := s.Execute(ctx, sql) + _, err := s.ExecuteInternal(ctx, sql) if err != nil { if terror.ErrorEqual(err, infoschema.ErrColumnExists) { continue @@ -779,7 +790,7 @@ func upgradeToVer15(s Session, ver int64) { return } var err error - _, err = s.Execute(context.Background(), CreateGCDeleteRangeTable) + _, err = s.ExecuteInternal(context.Background(), CreateGCDeleteRangeTable) if err != nil { logutil.BgLogger().Fatal("upgradeToVer15 error", zap.Error(err)) } @@ -849,9 +860,13 @@ func upgradeToVer23(s Session, ver int64) { // writeSystemTZ writes system timezone info into mysql.tidb func writeSystemTZ(s Session) { - sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", "%s", "TiDB Global System Timezone.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%s"`, - mysql.SystemDB, mysql.TiDBTable, tidbSystemTZ, timeutil.InferSystemTZ(), timeutil.InferSystemTZ()) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB Global System Timezone.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, + mysql.SystemDB, + mysql.TiDBTable, + tidbSystemTZ, + timeutil.InferSystemTZ(), + timeutil.InferSystemTZ(), + ) } // upgradeToVer24 initializes `System` timezone according to docs/design/2018-09-10-adding-tz-env.md @@ -980,7 +995,7 @@ func upgradeToVer38(s Session, ver int64) { return } var err error - _, err = s.Execute(context.Background(), CreateGlobalPrivTable) + _, err = s.ExecuteInternal(context.Background(), CreateGlobalPrivTable) if err != nil { logutil.BgLogger().Fatal("upgradeToVer38 error", zap.Error(err)) } @@ -1002,9 +1017,9 @@ func writeNewCollationParameter(s Session, flag bool) { if flag { b = varTrue } - sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%s', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%s'`, - mysql.SystemDB, mysql.TiDBTable, tidbNewCollationEnabled, b, comment, b) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`, + mysql.SystemDB, mysql.TiDBTable, tidbNewCollationEnabled, b, comment, b, + ) } func upgradeToVer40(s Session, ver int64) { @@ -1040,14 +1055,23 @@ func upgradeToVer42(s Session, ver int64) { // Convert statement summary global variables to non-empty values. func writeStmtSummaryVars(s Session) { - sql := fmt.Sprintf("UPDATE %s.%s SET variable_value='%%s' WHERE variable_name='%%s' AND variable_value=''", mysql.SystemDB, mysql.GlobalVariablesTable) + sql := "UPDATE %n.%n SET variable_value= %? WHERE variable_name= %? AND variable_value=''" stmtSummaryConfig := config.GetGlobalConfig().StmtSummary +<<<<<<< HEAD mustExecute(s, fmt.Sprintf(sql, variable.BoolToIntStr(stmtSummaryConfig.Enable), variable.TiDBEnableStmtSummary)) mustExecute(s, fmt.Sprintf(sql, variable.BoolToIntStr(stmtSummaryConfig.EnableInternalQuery), variable.TiDBStmtSummaryInternalQuery)) mustExecute(s, fmt.Sprintf(sql, strconv.Itoa(stmtSummaryConfig.RefreshInterval), variable.TiDBStmtSummaryRefreshInterval)) mustExecute(s, fmt.Sprintf(sql, strconv.Itoa(stmtSummaryConfig.HistorySize), variable.TiDBStmtSummaryHistorySize)) mustExecute(s, fmt.Sprintf(sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxStmtCount), 10), variable.TiDBStmtSummaryMaxStmtCount)) mustExecute(s, fmt.Sprintf(sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxSQLLength), 10), variable.TiDBStmtSummaryMaxSQLLength)) +======= + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, variable.BoolToOnOff(stmtSummaryConfig.Enable), variable.TiDBEnableStmtSummary) + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, variable.BoolToOnOff(stmtSummaryConfig.EnableInternalQuery), variable.TiDBStmtSummaryInternalQuery) + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.Itoa(stmtSummaryConfig.RefreshInterval), variable.TiDBStmtSummaryRefreshInterval) + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.Itoa(stmtSummaryConfig.HistorySize), variable.TiDBStmtSummaryHistorySize) + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.FormatUint(uint64(stmtSummaryConfig.MaxStmtCount), 10), variable.TiDBStmtSummaryMaxStmtCount) + mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.FormatUint(uint64(stmtSummaryConfig.MaxSQLLength), 10), variable.TiDBStmtSummaryMaxSQLLength) +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) } func upgradeToVer43(s Session, ver int64) { @@ -1115,8 +1139,73 @@ func upgradeToVer49(s Session, ver int64) { } } +<<<<<<< HEAD func upgradeToVer50(s Session, ver int64) { if ver >= version50 { +======= +// When cherry-pick upgradeToVer52 to v4.0, we wrongly name it upgradeToVer48. +// If we upgrade from v4.0 to a newer version, the real upgradeToVer48 will be missed. +// So we redo upgradeToVer48 here to make sure the upgrading from v4.0 succeeds. +func upgradeToVer55(s Session, ver int64) { + if ver >= version55 { + return + } + defValues := map[string]string{ + variable.TiDBIndexLookupConcurrency: "4", + variable.TiDBIndexLookupJoinConcurrency: "4", + variable.TiDBHashAggFinalConcurrency: "4", + variable.TiDBHashAggPartialConcurrency: "4", + variable.TiDBWindowConcurrency: "4", + variable.TiDBProjectionConcurrency: "4", + variable.TiDBHashJoinConcurrency: "5", + } + names := make([]string, 0, len(defValues)) + for n := range defValues { + names = append(names, n) + } + + selectSQL := "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(names, quoteCommaQuote) + "')" + ctx := context.Background() + rs, err := s.ExecuteInternal(ctx, selectSQL) + terror.MustNil(err) + defer terror.Call(rs.Close) + req := rs.NewChunk() + it := chunk.NewIterator4Chunk(req) + err = rs.Next(ctx, req) + for err == nil && req.NumRows() != 0 { + for row := it.Begin(); row != it.End(); row = it.Next() { + n := strings.ToLower(row.GetString(0)) + v := row.GetString(1) + if defValue, ok := defValues[n]; !ok || defValue != v { + return + } + } + err = rs.Next(ctx, req) + } + terror.MustNil(err) + + mustExecute(s, "BEGIN") + v := strconv.Itoa(variable.ConcurrencyUnset) + sql := fmt.Sprintf("UPDATE %s.%s SET variable_value='%%s' WHERE variable_name='%%s'", mysql.SystemDB, mysql.GlobalVariablesTable) + for _, name := range names { + mustExecute(s, fmt.Sprintf(sql, v, name)) + } + mustExecute(s, "COMMIT") +} + +// When cherry-pick upgradeToVer54 to v4.0, we wrongly name it upgradeToVer49. +// If we upgrade from v4.0 to a newer version, the real upgradeToVer49 will be missed. +// So we redo upgradeToVer49 here to make sure the upgrading from v4.0 succeeds. +func upgradeToVer56(s Session, ver int64) { + if ver >= version56 { + return + } + doReentrantDDL(s, CreateStatsExtended) +} + +func upgradeToVer57(s Session, ver int64) { + if ver >= version57 { +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) return } insertBuiltinBindInfoRow(s) @@ -1128,9 +1217,9 @@ func initBindInfoTable(s Session) { } func insertBuiltinBindInfoRow(s Session) { - sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO mysql.bind_info VALUES ("%s", "%s", "mysql", "%s", "0000-00-00 00:00:00", "0000-00-00 00:00:00", "", "", "%s")`, - bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.Builtin, bindinfo.Builtin) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO mysql.bind_info VALUES (%?, %?, "mysql", %?, "0000-00-00 00:00:00", "0000-00-00 00:00:00", "", "", %?)`, + bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.Builtin, bindinfo.Builtin, + ) } type bindInfo struct { @@ -1234,18 +1323,58 @@ func updateBindInfo(iter *chunk.Iterator4Chunk, p *parser.Parser, bindMap map[st } func writeMemoryQuotaQuery(s Session) { +<<<<<<< HEAD comment := "memory_quota_query is 32GB by default in v3.0.x, 1GB by default in v4.0.x" sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%d', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%d'`, mysql.SystemDB, mysql.TiDBTable, tidbDefMemoryQuotaQuery, 32<<30, comment, 32<<30) mustExecute(s, sql) } +======= + comment := "memory_quota_query is 32GB by default in v3.0.x, 1GB by default in v4.0.x+" + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`, + mysql.SystemDB, mysql.TiDBTable, tidbDefMemoryQuotaQuery, 32<<30, comment, 32<<30, + ) +} + +func upgradeToVer62(s Session, ver int64) { + if ver >= version62 { + return + } + doReentrantDDL(s, "ALTER TABLE mysql.stats_buckets ADD COLUMN `ndv` bigint not null default 0", infoschema.ErrColumnExists) +} + +func upgradeToVer63(s Session, ver int64) { + if ver >= version63 { + return + } + doReentrantDDL(s, "ALTER TABLE mysql.user ADD COLUMN `Create_tablespace_priv` ENUM('N','Y') DEFAULT 'N'", infoschema.ErrColumnExists) + mustExecute(s, "UPDATE HIGH_PRIORITY mysql.user SET Create_tablespace_priv='Y' where Super_priv='Y'") +} + +func upgradeToVer64(s Session, ver int64) { + if ver >= version64 { + return + } + doReentrantDDL(s, "ALTER TABLE mysql.user ADD COLUMN `Repl_slave_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Execute_priv`", infoschema.ErrColumnExists) + doReentrantDDL(s, "ALTER TABLE mysql.user ADD COLUMN `Repl_client_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Repl_slave_priv`", infoschema.ErrColumnExists) + mustExecute(s, "UPDATE HIGH_PRIORITY mysql.user SET Repl_slave_priv='Y',Repl_client_priv='Y'") +} + +func writeOOMAction(s Session) { + comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, + mysql.SystemDB, mysql.TiDBTable, tidbDefOOMAction, config.OOMActionLog, comment, config.OOMActionLog, + ) +} + +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) // updateBootstrapVer updates bootstrap version variable in mysql.TiDB table. func updateBootstrapVer(s Session) { // Update bootstrap version. - sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", "%d", "TiDB bootstrap version.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%d"`, - mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, currentBootstrapVersion) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB bootstrap version.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`, + mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, currentBootstrapVersion, + ) } // getBootstrapVersion gets bootstrap version from mysql.tidb table; @@ -1265,7 +1394,7 @@ func doDDLWorks(s Session) { // Create a test database. mustExecute(s, "CREATE DATABASE IF NOT EXISTS test") // Create system db. - mustExecute(s, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", mysql.SystemDB)) + mustExecute(s, "CREATE DATABASE IF NOT EXISTS %n", mysql.SystemDB) // Create user table. mustExecute(s, CreateUserTable) // Create privilege tables. @@ -1307,6 +1436,7 @@ func doDDLWorks(s Session) { // doDMLWorks executes DML statements in bootstrap stage. // All the statements run in a single transaction. +// TODO: sanitize. func doDMLWorks(s Session) { mustExecute(s, "BEGIN") @@ -1334,14 +1464,13 @@ func doDMLWorks(s Session) { strings.Join(values, ", ")) mustExecute(s, sql) - sql = fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES("%s", "%s", "Bootstrap flag. Do not delete.") - ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%s"`, - mysql.SystemDB, mysql.TiDBTable, bootstrappedVar, varTrue, varTrue) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES(%?, %?, "Bootstrap flag. Do not delete.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`, + mysql.SystemDB, mysql.TiDBTable, bootstrappedVar, varTrue, varTrue, + ) - sql = fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES("%s", "%d", "Bootstrap version. Do not delete.")`, - mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion) - mustExecute(s, sql) + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES(%?, %?, "Bootstrap version. Do not delete.")`, + mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, + ) writeSystemTZ(s) @@ -1351,7 +1480,7 @@ func doDMLWorks(s Session) { writeStmtSummaryVars(s) - _, err := s.Execute(context.Background(), "COMMIT") + _, err := s.ExecuteInternal(context.Background(), "COMMIT") if err != nil { sleepTime := 1 * time.Second logutil.BgLogger().Info("doDMLWorks failed", zap.Error(err), zap.Duration("sleeping time", sleepTime)) @@ -1368,8 +1497,13 @@ func doDMLWorks(s Session) { } } +<<<<<<< HEAD func mustExecute(s Session, sql string) { _, err := s.Execute(context.Background(), sql) +======= +func mustExecute(s Session, sql string, args ...interface{}) { + _, err := s.ExecuteInternal(context.Background(), sql, args...) +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) if err != nil { debug.PrintStack() logutil.BgLogger().Fatal("mustExecute error", zap.Error(err)) diff --git a/session/session.go b/session/session.go index a992f27e85dc2..5fae4b3243440 100644 --- a/session/session.go +++ b/session/session.go @@ -92,6 +92,17 @@ var ( sessionExecuteParseDurationGeneral = metrics.SessionExecuteParseDuration.WithLabelValues(metrics.LblGeneral) ) +<<<<<<< HEAD +======= +var gcVariableMap = map[string]string{ + variable.TiDBGCRunInterval: "tikv_gc_run_interval", + variable.TiDBGCLifetime: "tikv_gc_life_time", + variable.TiDBGCConcurrency: "tikv_gc_concurrency", + variable.TiDBGCEnable: "tikv_gc_enable", + variable.TiDBGCScanLockMode: "tikv_gc_scan_lock_mode", +} + +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) // Session context, it is consistent with the lifecycle of a client connection. type Session interface { sessionctx.Context @@ -956,15 +967,19 @@ func drainRecordSet(ctx context.Context, se *session, rs sqlexec.RecordSet) ([]c } } -// getExecRet executes restricted sql and the result is one column. +// getTableValue executes restricted sql and the result is one column. // It returns a string value. -func (s *session) getExecRet(ctx sessionctx.Context, sql string) (string, error) { - rows, fields, err := s.ExecRestrictedSQL(sql) +func (s *session) getTableValue(ctx context.Context, tblName string, varName string) (string, error) { + stmt, err := s.ParseWithParams(ctx, "SELECT VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME=%?", mysql.SystemDB, tblName, varName) + if err != nil { + return "", err + } + rows, fields, err := s.ExecRestrictedStmt(ctx, stmt) if err != nil { return "", err } if len(rows) == 0 { - return "", executor.ErrResultIsEmpty + return "", errResultIsEmpty } d := rows[0].GetDatum(0, &fields[0].Column.FieldType) value, err := d.ToString() @@ -974,10 +989,45 @@ func (s *session) getExecRet(ctx sessionctx.Context, sql string) (string, error) return value, nil } +<<<<<<< HEAD // GetAllSysVars implements GlobalVarAccessor.GetAllSysVars interface. func (s *session) GetAllSysVars() (map[string]string, error) { if s.Value(sessionctx.Initing) != nil { return nil, nil +======= +var gcVariableComments = map[string]string{ + variable.TiDBGCRunInterval: "GC run interval, at least 10m, in Go format.", + variable.TiDBGCLifetime: "All versions within life time will not be collected by GC, at least 10m, in Go format.", + variable.TiDBGCConcurrency: "How many goroutines used to do GC parallel, [1, 128], default 2", + variable.TiDBGCEnable: "Current GC enable status", + tiKVGCAutoConcurrency: "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", + variable.TiDBGCScanLockMode: "Mode of scanning locks, \"physical\" or \"legacy\"", +} + +// replaceTableValue executes restricted sql updates the variable value +func (s *session) replaceTableValue(ctx context.Context, tblName string, varName, val string) error { + if tblName == mysql.TiDBTable { // maintain comment metadata + comment := gcVariableComments[varName] + stmt, err := s.ParseWithParams(ctx, `REPLACE INTO %n.%n (variable_name, variable_value, comment) VALUES (%?, %?, %?)`, mysql.SystemDB, tblName, varName, val, comment) + if err != nil { + return err + } + _, _, err = s.ExecRestrictedStmt(ctx, stmt) + return err + } + stmt, err := s.ParseWithParams(ctx, `REPLACE INTO %n.%n (variable_name, variable_value) VALUES (%?, %?)`, mysql.SystemDB, tblName, varName, val) + if err != nil { + return err + } + _, _, err = s.ExecRestrictedStmt(ctx, stmt) + return err +} + +func (s *session) varFromTiDBTable(name string) bool { + switch name { + case variable.TiDBGCConcurrency, variable.TiDBGCEnable, variable.TiDBGCRunInterval, variable.TiDBGCLifetime, variable.TiDBGCScanLockMode: + return true +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) } sql := `SELECT VARIABLE_NAME, VARIABLE_VALUE FROM %s.%s;` sql = fmt.Sprintf(sql, mysql.SystemDB, mysql.GlobalVariablesTable) @@ -1002,12 +1052,16 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { // When running bootstrap or upgrade, we should not access global storage. return "", nil } - sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM %s.%s WHERE VARIABLE_NAME="%s";`, - mysql.SystemDB, mysql.GlobalVariablesTable, name) - sysVar, err := s.getExecRet(s, sql) + sysVar, err := s.getTableValue(context.TODO(), mysql.GlobalVariablesTable, name) if err != nil { +<<<<<<< HEAD if executor.ErrResultIsEmpty.Equal(err) { if sv, ok := variable.SysVars[name]; ok { +======= + if errResultIsEmpty.Equal(err) { + sv := variable.GetSysVar(name) + if sv != nil { +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) return sv.Value, nil } return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) @@ -1035,12 +1089,124 @@ func (s *session) SetGlobalSysVar(name, value string) error { return err } name = strings.ToLower(name) +<<<<<<< HEAD sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, mysql.SystemDB, mysql.GlobalVariablesTable, name, sVal) _, _, err = s.ExecRestrictedSQL(sql) return err } +======= + // update mysql.tidb if required. + if s.varFromTiDBTable(name) { + if err = s.setTiDBTableValue(name, sVal); err != nil { + return err + } + } + variable.CheckDeprecationSetSystemVar(s.sessionVars, name) + stmt, err := s.ParseWithParams(context.TODO(), "REPLACE %n.%n VALUES (%?, %?)", mysql.SystemDB, mysql.GlobalVariablesTable, name, sVal) + if err != nil { + return err + } + _, _, err = s.ExecRestrictedStmt(context.TODO(), stmt) + return err +} + +// setTiDBTableValue handles tikv_* sysvars which need to update mysql.tidb +// for backwards compatibility. Validation has already been performed. +func (s *session) setTiDBTableValue(name, val string) error { + if name == variable.TiDBGCConcurrency { + autoConcurrency := "false" + if val == "-1" { + autoConcurrency = "true" + } + err := s.replaceTableValue(context.TODO(), mysql.TiDBTable, tiKVGCAutoConcurrency, autoConcurrency) + if err != nil { + return err + } + } + val = onOffToTrueFalse(val) + err := s.replaceTableValue(context.TODO(), mysql.TiDBTable, gcVariableMap[name], val) + return err +} + +// In mysql.tidb the convention has been to store the string value "true"/"false", +// but sysvars use the convention ON/OFF. +func trueFalseToOnOff(str string) string { + if strings.EqualFold("true", str) { + return variable.BoolOn + } else if strings.EqualFold("false", str) { + return variable.BoolOff + } + return str +} + +// In mysql.tidb the convention has been to store the string value "true"/"false", +// but sysvars use the convention ON/OFF. +func onOffToTrueFalse(str string) string { + if strings.EqualFold("ON", str) { + return "true" + } else if strings.EqualFold("OFF", str) { + return "false" + } + return str +} + +// getTiDBTableValue handles tikv_* sysvars which need +// to read from mysql.tidb for backwards compatibility. +func (s *session) getTiDBTableValue(name, val string) (string, error) { + if name == variable.TiDBGCConcurrency { + // Check if autoconcurrency is set + autoConcurrencyVal, err := s.getTableValue(context.TODO(), mysql.TiDBTable, tiKVGCAutoConcurrency) + if err == nil && strings.EqualFold(autoConcurrencyVal, "true") { + return "-1", nil // convention for "AUTO" + } + } + tblValue, err := s.getTableValue(context.TODO(), mysql.TiDBTable, gcVariableMap[name]) + if err != nil { + return val, nil // mysql.tidb value does not exist. + } + // Run validation on the tblValue. This will return an error if it can't be validated, + // but will also make it more consistent: disTribuTeD -> DISTRIBUTED etc + tblValue = trueFalseToOnOff(tblValue) + validatedVal, err := variable.ValidateSetSystemVar(s.sessionVars, name, tblValue, variable.ScopeGlobal) + if err != nil { + logutil.Logger(context.Background()).Warn("restoring sysvar value since validating mysql.tidb value failed", + zap.Error(err), + zap.String("name", name), + zap.String("tblName", gcVariableMap[name]), + zap.String("tblValue", tblValue), + zap.String("restoredValue", val)) + err = s.replaceTableValue(context.TODO(), mysql.TiDBTable, gcVariableMap[name], val) + return val, err + } + if validatedVal != val { + // The sysvar value is out of sync. + err = s.replaceTableValue(context.TODO(), mysql.GlobalVariablesTable, gcVariableMap[name], validatedVal) + return validatedVal, err + } + return validatedVal, nil +} + +func (s *session) ensureFullGlobalStats() error { + stmt, err := s.ParseWithParams(context.TODO(), `select count(1) from information_schema.tables t where t.create_options = 'partitioned' + and not exists (select 1 from mysql.stats_meta m where m.table_id = t.tidb_table_id)`) + if err != nil { + return err + } + rows, _, err := s.ExecRestrictedStmt(context.TODO(), stmt) + if err != nil { + return err + } + row := rows[0] + count := row.GetInt64(0) + if count > 0 { + return errors.New("need analyze all partition table in 'static-collect-dynamic' mode before switch to 'dynamic-only'") + } + return nil +} + +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) func (s *session) ParseSQL(ctx context.Context, sql, charset, collation string) ([]ast.StmtNode, []error, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("session.ParseSQL", opentracing.ChildOf(span.Context())) @@ -1693,11 +1859,6 @@ func CreateSessionWithOpt(store kv.Storage, opt *Opt) (Session, error) { return s, nil } -// loadSystemTZ loads systemTZ from mysql.tidb -func loadSystemTZ(se *session) (string, error) { - return loadParameter(se, "system_tz") -} - // loadCollationParameter loads collation parameter from mysql.tidb func loadCollationParameter(se *session) (bool, error) { para, err := loadParameter(se, tidbNewCollationEnabled) @@ -1741,6 +1902,7 @@ var ( // loadParameter loads read-only parameter from mysql.tidb func loadParameter(se *session, name string) (string, error) { +<<<<<<< HEAD sql := "select variable_value from mysql.tidb where variable_name = '" + name + "'" rss, errLoad := se.Execute(context.Background(), sql) if errLoad != nil { @@ -1760,6 +1922,9 @@ func loadParameter(se *session, name string) (string, error) { return "", errResultIsEmpty } return req.GetRow(0).GetString(0), nil +======= + return se.getTableValue(context.TODO(), mysql.TiDBTable, name) +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) } // BootstrapSession runs the first time when the TiDB server start. @@ -1777,8 +1942,6 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { } } - initLoadCommonGlobalVarsSQL() - ver := getStoreBootstrapVersion(store) if ver == notBootstrapped { runInBootstrapSession(store, bootstrap) @@ -1790,8 +1953,9 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { if err != nil { return nil, err } + // get system tz from mysql.tidb - tz, err := loadSystemTZ(se) + tz, err := se.getTableValue(context.TODO(), mysql.TiDBTable, "system_tz") if err != nil { return nil, err } @@ -2118,24 +2282,8 @@ var builtinGlobalVariable = []string{ variable.TiDBMultiStatementMode, } -var ( - loadCommonGlobalVarsSQLOnce sync.Once - loadCommonGlobalVarsSQL string -) - -func initLoadCommonGlobalVarsSQL() { - loadCommonGlobalVarsSQLOnce.Do(func() { - vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...) - if len(variable.PluginVarNames) > 0 { - vars = append(vars, variable.PluginVarNames...) - } - loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(vars, quoteCommaQuote) + "')" - }) -} - // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. func (s *session) loadCommonGlobalVariablesIfNeeded() error { - initLoadCommonGlobalVarsSQL() vars := s.sessionVars if vars.CommonGlobalLoaded { return nil @@ -2150,7 +2298,17 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { // When a lot of connections connect to TiDB simultaneously, it can protect TiKV meta region from overload. gvc := domain.GetDomain(s).GetGlobalVarsCache() loadFunc := func() ([]chunk.Row, []*ast.ResultField, error) { - return s.ExecRestrictedSQL(loadCommonGlobalVarsSQL) + vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...) + if len(variable.PluginVarNames) > 0 { + vars = append(vars, variable.PluginVarNames...) + } + + stmt, err := s.ParseWithParams(context.TODO(), "select HIGH_PRIORITY * from mysql.global_variables where variable_name in (%?)", vars) + if err != nil { + return nil, nil, errors.Trace(err) + } + + return s.ExecRestrictedStmt(context.TODO(), stmt) } rows, fields, err := gvc.LoadGlobalVariables(loadFunc) if err != nil { diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 3c681aafb3804..4be4abfb55d40 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -201,6 +201,147 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { return nil } +<<<<<<< HEAD +======= +// UpdateSessionVar updates the necessary session variables for the stats reader. +func (h *Handle) UpdateSessionVar() error { + h.mu.Lock() + defer h.mu.Unlock() + verInString, err := h.mu.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion) + if err != nil { + return err + } + ver, err := strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + h.mu.ctx.GetSessionVars().AnalyzeVersion = int(ver) + return err +} + +// GlobalStats is used to store the statistics contained in the global-level stats +// which is generated by the merge of partition-level stats. +// It will both store the column stats and index stats. +// In the column statistics, the variable `num` is equal to the number of columns in the partition table. +// In the index statistics, the variable `num` is always equal to one. +type GlobalStats struct { + Num int + Count int64 + Hg []*statistics.Histogram + Cms []*statistics.CMSketch + TopN []*statistics.TopN +} + +// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableID. +func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, is infoschema.InfoSchema, physicalID int64, isIndex int, idxID int64) (globalStats *GlobalStats, err error) { + // get the partition table IDs + h.mu.Lock() + globalTable, ok := h.getTableByPhysicalID(is, physicalID) + h.mu.Unlock() + if !ok { + err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", physicalID) + return + } + globalTableInfo := globalTable.Meta() + partitionNum := globalTableInfo.Partition.Num + partitionIDs := make([]int64, 0, partitionNum) + for i := uint64(0); i < partitionNum; i++ { + partitionIDs = append(partitionIDs, globalTableInfo.Partition.Definitions[i].ID) + } + + // initialized the globalStats + globalStats = new(GlobalStats) + if isIndex == 0 { + globalStats.Num = len(globalTableInfo.Columns) + } else { + globalStats.Num = 1 + } + globalStats.Count = 0 + globalStats.Hg = make([]*statistics.Histogram, globalStats.Num) + globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num) + globalStats.TopN = make([]*statistics.TopN, globalStats.Num) + + // The first dimension of slice is means the number of column or index stats in the globalStats. + // The second dimension of slice is means the number of partition tables. + // Because all topN and histograms need to be collected before they can be merged. + // So we should store all of the partition-level stats first, and merge them together. + allHg := make([][]*statistics.Histogram, globalStats.Num) + allCms := make([][]*statistics.CMSketch, globalStats.Num) + allTopN := make([][]*statistics.TopN, globalStats.Num) + for i := 0; i < globalStats.Num; i++ { + allHg[i] = make([]*statistics.Histogram, 0, partitionNum) + allCms[i] = make([]*statistics.CMSketch, 0, partitionNum) + allTopN[i] = make([]*statistics.TopN, 0, partitionNum) + } + + for _, partitionID := range partitionIDs { + h.mu.Lock() + partitionTable, ok := h.getTableByPhysicalID(is, partitionID) + h.mu.Unlock() + if !ok { + err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID) + return + } + tableInfo := partitionTable.Meta() + var partitionStats *statistics.Table + partitionStats, err = h.TableStatsFromStorage(tableInfo, partitionID, false, 0) + if err != nil { + return + } + if partitionStats == nil { + err = errors.Errorf("[stats] error occurred when read partition-level stats of the table with tableID %d and partitionID %d", physicalID, partitionID) + return + } + globalStats.Count += partitionStats.Count + for i := 0; i < globalStats.Num; i++ { + ID := tableInfo.Columns[i].ID + if isIndex != 0 { + // If the statistics is the index stats, we should use the index ID to replace the column ID. + ID = idxID + } + hg, cms, topN := partitionStats.GetStatsInfo(ID, isIndex == 1) + allHg[i] = append(allHg[i], hg) + allCms[i] = append(allCms[i], cms) + allTopN[i] = append(allTopN[i], topN) + } + } + + // After collect all of the statistics from the partition-level stats, + // we should merge them together. + for i := 0; i < globalStats.Num; i++ { + // Merge CMSketch + globalStats.Cms[i] = allCms[i][0].Copy() + for j := uint64(1); j < partitionNum; j++ { + err = globalStats.Cms[i].MergeCMSketch(allCms[i][j]) + if err != nil { + return + } + } + + // Merge topN. We need to merge TopN before merging the histogram. + // Because after merging TopN, some numbers will be left. + // These left numbers should be inserted into the histogram. + err = errors.Errorf("TODO: The merge function of the topN structure has not been implemented yet") + if err != nil { + return + } + + // Merge histogram + globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc, allHg[i], 0) + if err != nil { + return + } + + // Merge NDV + err = errors.Errorf("TODO: The merge function of the NDV has not been implemented yet") + if err != nil { + return + } + } + return +} + +>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) { if is.SchemaMetaVersion() != h.mu.schemaVersion { h.mu.schemaVersion = is.SchemaMetaVersion() diff --git a/util/testkit/testkit.go b/util/testkit/testkit.go index e5f76ec4d64dd..2aec7d71aa9dc 100644 --- a/util/testkit/testkit.go +++ b/util/testkit/testkit.go @@ -137,7 +137,7 @@ func (tk *TestKit) GetConnectionID() { } } -// Exec executes a sql statement. +// Exec executes a sql statement using the prepared stmt API func (tk *TestKit) Exec(sql string, args ...interface{}) (sqlexec.RecordSet, error) { var err error if tk.Se == nil { From 4094264c71cdf7e8b75a71df6c50fb3332fead6a Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sun, 7 Mar 2021 16:47:13 -0700 Subject: [PATCH 2/3] Merge changes into 4.0 --- session/bootstrap.go | 153 ++----------------------- session/session.go | 221 ++++-------------------------------- statistics/handle/handle.go | 141 ----------------------- util/testkit/testkit.go | 2 +- 4 files changed, 35 insertions(+), 482 deletions(-) diff --git a/session/bootstrap.go b/session/bootstrap.go index e1ec00d6a8ec5..dc10cbe2e3a3d 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -578,13 +578,9 @@ func upgradeToVer3(s Session, ver int64) { return } // Version 3 fix tx_read_only variable value. -<<<<<<< HEAD sql := fmt.Sprintf("UPDATE HIGH_PRIORITY %s.%s set variable_value = '0' where variable_name = 'tx_read_only';", mysql.SystemDB, mysql.GlobalVariablesTable) mustExecute(s, sql) -======= - mustExecute(s, "UPDATE HIGH_PRIORITY %n.%n SET variable_value = '0' WHERE variable_name = 'tx_read_only';", mysql.SystemDB, mysql.GlobalVariablesTable) ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) } // upgradeToVer4 updates to version 4. @@ -592,7 +588,8 @@ func upgradeToVer4(s Session, ver int64) { if ver >= version4 { return } - mustExecute(s, CreateStatsMetaTable) + sql := CreateStatsMetaTable + mustExecute(s, sql) } func upgradeToVer5(s Session, ver int64) { @@ -626,11 +623,7 @@ func upgradeToVer8(s Session, ver int64) { return } // This is a dummy upgrade, it checks whether upgradeToVer7 success, if not, do it again. -<<<<<<< HEAD - if _, err := s.Execute(context.Background(), "SELECT HIGH_PRIORITY `Process_priv` from mysql.user limit 0"); err == nil { -======= - if _, err := s.ExecuteInternal(context.Background(), "SELECT HIGH_PRIORITY `Process_priv` FROM mysql.user LIMIT 0"); err == nil { ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) + if _, err := s.ExecuteInternal(context.Background(), "SELECT HIGH_PRIORITY `Process_priv` from mysql.user limit 0"); err == nil { return } upgradeToVer7(s, ver) @@ -672,11 +665,7 @@ func upgradeToVer11(s Session, ver int64) { if ver >= version11 { return } -<<<<<<< HEAD - _, err := s.Execute(context.Background(), "ALTER TABLE mysql.user ADD COLUMN `References_priv` enum('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Grant_priv`") -======= - _, err := s.ExecuteInternal(context.Background(), "ALTER TABLE mysql.user ADD COLUMN `References_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Grant_priv`") ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) + _, err := s.ExecuteInternal(context.Background(), "ALTER TABLE mysql.user ADD COLUMN `References_priv` enum('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Grant_priv`") if err != nil { if terror.ErrorEqual(err, infoschema.ErrColumnExists) { return @@ -1055,23 +1044,14 @@ func upgradeToVer42(s Session, ver int64) { // Convert statement summary global variables to non-empty values. func writeStmtSummaryVars(s Session) { - sql := "UPDATE %n.%n SET variable_value= %? WHERE variable_name= %? AND variable_value=''" + sql := "UPDATE mysql.global_variables SET variable_value= %? WHERE variable_name= %? AND variable_value=''" stmtSummaryConfig := config.GetGlobalConfig().StmtSummary -<<<<<<< HEAD - mustExecute(s, fmt.Sprintf(sql, variable.BoolToIntStr(stmtSummaryConfig.Enable), variable.TiDBEnableStmtSummary)) - mustExecute(s, fmt.Sprintf(sql, variable.BoolToIntStr(stmtSummaryConfig.EnableInternalQuery), variable.TiDBStmtSummaryInternalQuery)) - mustExecute(s, fmt.Sprintf(sql, strconv.Itoa(stmtSummaryConfig.RefreshInterval), variable.TiDBStmtSummaryRefreshInterval)) - mustExecute(s, fmt.Sprintf(sql, strconv.Itoa(stmtSummaryConfig.HistorySize), variable.TiDBStmtSummaryHistorySize)) - mustExecute(s, fmt.Sprintf(sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxStmtCount), 10), variable.TiDBStmtSummaryMaxStmtCount)) - mustExecute(s, fmt.Sprintf(sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxSQLLength), 10), variable.TiDBStmtSummaryMaxSQLLength)) -======= - mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, variable.BoolToOnOff(stmtSummaryConfig.Enable), variable.TiDBEnableStmtSummary) - mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, variable.BoolToOnOff(stmtSummaryConfig.EnableInternalQuery), variable.TiDBStmtSummaryInternalQuery) - mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.Itoa(stmtSummaryConfig.RefreshInterval), variable.TiDBStmtSummaryRefreshInterval) - mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.Itoa(stmtSummaryConfig.HistorySize), variable.TiDBStmtSummaryHistorySize) - mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.FormatUint(uint64(stmtSummaryConfig.MaxStmtCount), 10), variable.TiDBStmtSummaryMaxStmtCount) - mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.FormatUint(uint64(stmtSummaryConfig.MaxSQLLength), 10), variable.TiDBStmtSummaryMaxSQLLength) ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) + mustExecute(s, sql, variable.BoolToIntStr(stmtSummaryConfig.Enable), variable.TiDBEnableStmtSummary) + mustExecute(s, sql, variable.BoolToIntStr(stmtSummaryConfig.EnableInternalQuery), variable.TiDBStmtSummaryInternalQuery) + mustExecute(s, sql, strconv.Itoa(stmtSummaryConfig.RefreshInterval), variable.TiDBStmtSummaryRefreshInterval) + mustExecute(s, sql, strconv.Itoa(stmtSummaryConfig.HistorySize), variable.TiDBStmtSummaryHistorySize) + mustExecute(s, sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxStmtCount), 10), variable.TiDBStmtSummaryMaxStmtCount) + mustExecute(s, sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxSQLLength), 10), variable.TiDBStmtSummaryMaxSQLLength) } func upgradeToVer43(s Session, ver int64) { @@ -1139,73 +1119,8 @@ func upgradeToVer49(s Session, ver int64) { } } -<<<<<<< HEAD func upgradeToVer50(s Session, ver int64) { if ver >= version50 { -======= -// When cherry-pick upgradeToVer52 to v4.0, we wrongly name it upgradeToVer48. -// If we upgrade from v4.0 to a newer version, the real upgradeToVer48 will be missed. -// So we redo upgradeToVer48 here to make sure the upgrading from v4.0 succeeds. -func upgradeToVer55(s Session, ver int64) { - if ver >= version55 { - return - } - defValues := map[string]string{ - variable.TiDBIndexLookupConcurrency: "4", - variable.TiDBIndexLookupJoinConcurrency: "4", - variable.TiDBHashAggFinalConcurrency: "4", - variable.TiDBHashAggPartialConcurrency: "4", - variable.TiDBWindowConcurrency: "4", - variable.TiDBProjectionConcurrency: "4", - variable.TiDBHashJoinConcurrency: "5", - } - names := make([]string, 0, len(defValues)) - for n := range defValues { - names = append(names, n) - } - - selectSQL := "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(names, quoteCommaQuote) + "')" - ctx := context.Background() - rs, err := s.ExecuteInternal(ctx, selectSQL) - terror.MustNil(err) - defer terror.Call(rs.Close) - req := rs.NewChunk() - it := chunk.NewIterator4Chunk(req) - err = rs.Next(ctx, req) - for err == nil && req.NumRows() != 0 { - for row := it.Begin(); row != it.End(); row = it.Next() { - n := strings.ToLower(row.GetString(0)) - v := row.GetString(1) - if defValue, ok := defValues[n]; !ok || defValue != v { - return - } - } - err = rs.Next(ctx, req) - } - terror.MustNil(err) - - mustExecute(s, "BEGIN") - v := strconv.Itoa(variable.ConcurrencyUnset) - sql := fmt.Sprintf("UPDATE %s.%s SET variable_value='%%s' WHERE variable_name='%%s'", mysql.SystemDB, mysql.GlobalVariablesTable) - for _, name := range names { - mustExecute(s, fmt.Sprintf(sql, v, name)) - } - mustExecute(s, "COMMIT") -} - -// When cherry-pick upgradeToVer54 to v4.0, we wrongly name it upgradeToVer49. -// If we upgrade from v4.0 to a newer version, the real upgradeToVer49 will be missed. -// So we redo upgradeToVer49 here to make sure the upgrading from v4.0 succeeds. -func upgradeToVer56(s Session, ver int64) { - if ver >= version56 { - return - } - doReentrantDDL(s, CreateStatsExtended) -} - -func upgradeToVer57(s Session, ver int64) { - if ver >= version57 { ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) return } insertBuiltinBindInfoRow(s) @@ -1321,52 +1236,12 @@ func updateBindInfo(iter *chunk.Iterator4Chunk, p *parser.Parser, bindMap map[st } func writeMemoryQuotaQuery(s Session) { -<<<<<<< HEAD comment := "memory_quota_query is 32GB by default in v3.0.x, 1GB by default in v4.0.x" - sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%d', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%d'`, - mysql.SystemDB, mysql.TiDBTable, tidbDefMemoryQuotaQuery, 32<<30, comment, 32<<30) - mustExecute(s, sql) -} - -======= - comment := "memory_quota_query is 32GB by default in v3.0.x, 1GB by default in v4.0.x+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`, mysql.SystemDB, mysql.TiDBTable, tidbDefMemoryQuotaQuery, 32<<30, comment, 32<<30, ) } -func upgradeToVer62(s Session, ver int64) { - if ver >= version62 { - return - } - doReentrantDDL(s, "ALTER TABLE mysql.stats_buckets ADD COLUMN `ndv` bigint not null default 0", infoschema.ErrColumnExists) -} - -func upgradeToVer63(s Session, ver int64) { - if ver >= version63 { - return - } - doReentrantDDL(s, "ALTER TABLE mysql.user ADD COLUMN `Create_tablespace_priv` ENUM('N','Y') DEFAULT 'N'", infoschema.ErrColumnExists) - mustExecute(s, "UPDATE HIGH_PRIORITY mysql.user SET Create_tablespace_priv='Y' where Super_priv='Y'") -} - -func upgradeToVer64(s Session, ver int64) { - if ver >= version64 { - return - } - doReentrantDDL(s, "ALTER TABLE mysql.user ADD COLUMN `Repl_slave_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Execute_priv`", infoschema.ErrColumnExists) - doReentrantDDL(s, "ALTER TABLE mysql.user ADD COLUMN `Repl_client_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Repl_slave_priv`", infoschema.ErrColumnExists) - mustExecute(s, "UPDATE HIGH_PRIORITY mysql.user SET Repl_slave_priv='Y',Repl_client_priv='Y'") -} - -func writeOOMAction(s Session) { - comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" - mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, - mysql.SystemDB, mysql.TiDBTable, tidbDefOOMAction, config.OOMActionLog, comment, config.OOMActionLog, - ) -} - ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) // updateBootstrapVer updates bootstrap version variable in mysql.TiDB table. func updateBootstrapVer(s Session) { // Update bootstrap version. @@ -1434,7 +1309,6 @@ func doDDLWorks(s Session) { // doDMLWorks executes DML statements in bootstrap stage. // All the statements run in a single transaction. -// TODO: sanitize. func doDMLWorks(s Session) { mustExecute(s, "BEGIN") @@ -1495,13 +1369,8 @@ func doDMLWorks(s Session) { } } -<<<<<<< HEAD -func mustExecute(s Session, sql string) { - _, err := s.Execute(context.Background(), sql) -======= func mustExecute(s Session, sql string, args ...interface{}) { _, err := s.ExecuteInternal(context.Background(), sql, args...) ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) if err != nil { debug.PrintStack() logutil.BgLogger().Fatal("mustExecute error", zap.Error(err)) diff --git a/session/session.go b/session/session.go index 1a284e5f9face..1413b50426914 100644 --- a/session/session.go +++ b/session/session.go @@ -92,17 +92,6 @@ var ( sessionExecuteParseDurationGeneral = metrics.SessionExecuteParseDuration.WithLabelValues(metrics.LblGeneral) ) -<<<<<<< HEAD -======= -var gcVariableMap = map[string]string{ - variable.TiDBGCRunInterval: "tikv_gc_run_interval", - variable.TiDBGCLifetime: "tikv_gc_life_time", - variable.TiDBGCConcurrency: "tikv_gc_concurrency", - variable.TiDBGCEnable: "tikv_gc_enable", - variable.TiDBGCScanLockMode: "tikv_gc_scan_lock_mode", -} - ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) // Session context, it is consistent with the lifecycle of a client connection. type Session interface { sessionctx.Context @@ -966,49 +955,16 @@ func (s *session) getTableValue(ctx context.Context, tblName string, varName str return value, nil } -<<<<<<< HEAD // GetAllSysVars implements GlobalVarAccessor.GetAllSysVars interface. func (s *session) GetAllSysVars() (map[string]string, error) { if s.Value(sessionctx.Initing) != nil { return nil, nil -======= -var gcVariableComments = map[string]string{ - variable.TiDBGCRunInterval: "GC run interval, at least 10m, in Go format.", - variable.TiDBGCLifetime: "All versions within life time will not be collected by GC, at least 10m, in Go format.", - variable.TiDBGCConcurrency: "How many goroutines used to do GC parallel, [1, 128], default 2", - variable.TiDBGCEnable: "Current GC enable status", - tiKVGCAutoConcurrency: "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", - variable.TiDBGCScanLockMode: "Mode of scanning locks, \"physical\" or \"legacy\"", -} - -// replaceTableValue executes restricted sql updates the variable value -func (s *session) replaceTableValue(ctx context.Context, tblName string, varName, val string) error { - if tblName == mysql.TiDBTable { // maintain comment metadata - comment := gcVariableComments[varName] - stmt, err := s.ParseWithParams(ctx, `REPLACE INTO %n.%n (variable_name, variable_value, comment) VALUES (%?, %?, %?)`, mysql.SystemDB, tblName, varName, val, comment) - if err != nil { - return err - } - _, _, err = s.ExecRestrictedStmt(ctx, stmt) - return err } - stmt, err := s.ParseWithParams(ctx, `REPLACE INTO %n.%n (variable_name, variable_value) VALUES (%?, %?)`, mysql.SystemDB, tblName, varName, val) + stmt, err := s.ParseWithParams(context.TODO(), `SELECT VARIABLE_NAME, VARIABLE_VALUE FROM %n.%n`, mysql.SystemDB, mysql.GlobalVariablesTable) if err != nil { - return err - } - _, _, err = s.ExecRestrictedStmt(ctx, stmt) - return err -} - -func (s *session) varFromTiDBTable(name string) bool { - switch name { - case variable.TiDBGCConcurrency, variable.TiDBGCEnable, variable.TiDBGCRunInterval, variable.TiDBGCLifetime, variable.TiDBGCScanLockMode: - return true ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) + return nil, err } - sql := `SELECT VARIABLE_NAME, VARIABLE_VALUE FROM %s.%s;` - sql = fmt.Sprintf(sql, mysql.SystemDB, mysql.GlobalVariablesTable) - rows, _, err := s.ExecRestrictedSQL(sql) + rows, _, err := s.ExecRestrictedStmt(context.TODO(), stmt) if err != nil { return nil, err } @@ -1031,14 +987,8 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { } sysVar, err := s.getTableValue(context.TODO(), mysql.GlobalVariablesTable, name) if err != nil { -<<<<<<< HEAD - if executor.ErrResultIsEmpty.Equal(err) { - if sv, ok := variable.SysVars[name]; ok { -======= if errResultIsEmpty.Equal(err) { - sv := variable.GetSysVar(name) - if sv != nil { ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) + if sv, ok := variable.SysVars[name]; ok { return sv.Value, nil } return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) @@ -1066,21 +1016,6 @@ func (s *session) SetGlobalSysVar(name, value string) error { return err } name = strings.ToLower(name) -<<<<<<< HEAD - sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, - mysql.SystemDB, mysql.GlobalVariablesTable, name, sVal) - _, _, err = s.ExecRestrictedSQL(sql) - return err -} - -======= - // update mysql.tidb if required. - if s.varFromTiDBTable(name) { - if err = s.setTiDBTableValue(name, sVal); err != nil { - return err - } - } - variable.CheckDeprecationSetSystemVar(s.sessionVars, name) stmt, err := s.ParseWithParams(context.TODO(), "REPLACE %n.%n VALUES (%?, %?)", mysql.SystemDB, mysql.GlobalVariablesTable, name, sVal) if err != nil { return err @@ -1089,101 +1024,6 @@ func (s *session) SetGlobalSysVar(name, value string) error { return err } -// setTiDBTableValue handles tikv_* sysvars which need to update mysql.tidb -// for backwards compatibility. Validation has already been performed. -func (s *session) setTiDBTableValue(name, val string) error { - if name == variable.TiDBGCConcurrency { - autoConcurrency := "false" - if val == "-1" { - autoConcurrency = "true" - } - err := s.replaceTableValue(context.TODO(), mysql.TiDBTable, tiKVGCAutoConcurrency, autoConcurrency) - if err != nil { - return err - } - } - val = onOffToTrueFalse(val) - err := s.replaceTableValue(context.TODO(), mysql.TiDBTable, gcVariableMap[name], val) - return err -} - -// In mysql.tidb the convention has been to store the string value "true"/"false", -// but sysvars use the convention ON/OFF. -func trueFalseToOnOff(str string) string { - if strings.EqualFold("true", str) { - return variable.BoolOn - } else if strings.EqualFold("false", str) { - return variable.BoolOff - } - return str -} - -// In mysql.tidb the convention has been to store the string value "true"/"false", -// but sysvars use the convention ON/OFF. -func onOffToTrueFalse(str string) string { - if strings.EqualFold("ON", str) { - return "true" - } else if strings.EqualFold("OFF", str) { - return "false" - } - return str -} - -// getTiDBTableValue handles tikv_* sysvars which need -// to read from mysql.tidb for backwards compatibility. -func (s *session) getTiDBTableValue(name, val string) (string, error) { - if name == variable.TiDBGCConcurrency { - // Check if autoconcurrency is set - autoConcurrencyVal, err := s.getTableValue(context.TODO(), mysql.TiDBTable, tiKVGCAutoConcurrency) - if err == nil && strings.EqualFold(autoConcurrencyVal, "true") { - return "-1", nil // convention for "AUTO" - } - } - tblValue, err := s.getTableValue(context.TODO(), mysql.TiDBTable, gcVariableMap[name]) - if err != nil { - return val, nil // mysql.tidb value does not exist. - } - // Run validation on the tblValue. This will return an error if it can't be validated, - // but will also make it more consistent: disTribuTeD -> DISTRIBUTED etc - tblValue = trueFalseToOnOff(tblValue) - validatedVal, err := variable.ValidateSetSystemVar(s.sessionVars, name, tblValue, variable.ScopeGlobal) - if err != nil { - logutil.Logger(context.Background()).Warn("restoring sysvar value since validating mysql.tidb value failed", - zap.Error(err), - zap.String("name", name), - zap.String("tblName", gcVariableMap[name]), - zap.String("tblValue", tblValue), - zap.String("restoredValue", val)) - err = s.replaceTableValue(context.TODO(), mysql.TiDBTable, gcVariableMap[name], val) - return val, err - } - if validatedVal != val { - // The sysvar value is out of sync. - err = s.replaceTableValue(context.TODO(), mysql.GlobalVariablesTable, gcVariableMap[name], validatedVal) - return validatedVal, err - } - return validatedVal, nil -} - -func (s *session) ensureFullGlobalStats() error { - stmt, err := s.ParseWithParams(context.TODO(), `select count(1) from information_schema.tables t where t.create_options = 'partitioned' - and not exists (select 1 from mysql.stats_meta m where m.table_id = t.tidb_table_id)`) - if err != nil { - return err - } - rows, _, err := s.ExecRestrictedStmt(context.TODO(), stmt) - if err != nil { - return err - } - row := rows[0] - count := row.GetInt64(0) - if count > 0 { - return errors.New("need analyze all partition table in 'static-collect-dynamic' mode before switch to 'dynamic-only'") - } - return nil -} - ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) func (s *session) ParseSQL(ctx context.Context, sql, charset, collation string) ([]ast.StmtNode, []error, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("session.ParseSQL", opentracing.ChildOf(span.Context())) @@ -2023,29 +1863,7 @@ var ( // loadParameter loads read-only parameter from mysql.tidb func loadParameter(se *session, name string) (string, error) { -<<<<<<< HEAD - sql := "select variable_value from mysql.tidb where variable_name = '" + name + "'" - rss, errLoad := se.Execute(context.Background(), sql) - if errLoad != nil { - return "", errLoad - } - // the record of mysql.tidb under where condition: variable_name = $name should shall only be one. - defer func() { - if err := rss[0].Close(); err != nil { - logutil.BgLogger().Error("close result set error", zap.Error(err)) - } - }() - req := rss[0].NewChunk() - if err := rss[0].Next(context.Background(), req); err != nil { - return "", err - } - if req.NumRows() == 0 { - return "", errResultIsEmpty - } - return req.GetRow(0).GetString(0), nil -======= return se.getTableValue(context.TODO(), mysql.TiDBTable, name) ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) } // BootstrapSession runs the first time when the TiDB server start. @@ -2063,6 +1881,8 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { } } + initLoadCommonGlobalVarsSQL() + ver := getStoreBootstrapVersion(store) if ver == notBootstrapped { runInBootstrapSession(store, bootstrap) @@ -2074,7 +1894,6 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { if err != nil { return nil, err } - // get system tz from mysql.tidb tz, err := se.getTableValue(context.TODO(), mysql.TiDBTable, "system_tz") if err != nil { @@ -2403,8 +2222,24 @@ var builtinGlobalVariable = []string{ variable.TiDBMultiStatementMode, } +var ( + loadCommonGlobalVarsSQLOnce sync.Once + loadCommonGlobalVarsSQL string +) + +func initLoadCommonGlobalVarsSQL() { + loadCommonGlobalVarsSQLOnce.Do(func() { + vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...) + if len(variable.PluginVarNames) > 0 { + vars = append(vars, variable.PluginVarNames...) + } + loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(vars, quoteCommaQuote) + "')" + }) +} + // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. func (s *session) loadCommonGlobalVariablesIfNeeded() error { + initLoadCommonGlobalVarsSQL() vars := s.sessionVars if vars.CommonGlobalLoaded { return nil @@ -2419,17 +2254,7 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { // When a lot of connections connect to TiDB simultaneously, it can protect TiKV meta region from overload. gvc := domain.GetDomain(s).GetGlobalVarsCache() loadFunc := func() ([]chunk.Row, []*ast.ResultField, error) { - vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...) - if len(variable.PluginVarNames) > 0 { - vars = append(vars, variable.PluginVarNames...) - } - - stmt, err := s.ParseWithParams(context.TODO(), "select HIGH_PRIORITY * from mysql.global_variables where variable_name in (%?)", vars) - if err != nil { - return nil, nil, errors.Trace(err) - } - - return s.ExecRestrictedStmt(context.TODO(), stmt) + return s.ExecRestrictedSQL(loadCommonGlobalVarsSQL) } rows, fields, err := gvc.LoadGlobalVariables(loadFunc) if err != nil { diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 4be4abfb55d40..3c681aafb3804 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -201,147 +201,6 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { return nil } -<<<<<<< HEAD -======= -// UpdateSessionVar updates the necessary session variables for the stats reader. -func (h *Handle) UpdateSessionVar() error { - h.mu.Lock() - defer h.mu.Unlock() - verInString, err := h.mu.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion) - if err != nil { - return err - } - ver, err := strconv.ParseInt(verInString, 10, 64) - if err != nil { - return err - } - h.mu.ctx.GetSessionVars().AnalyzeVersion = int(ver) - return err -} - -// GlobalStats is used to store the statistics contained in the global-level stats -// which is generated by the merge of partition-level stats. -// It will both store the column stats and index stats. -// In the column statistics, the variable `num` is equal to the number of columns in the partition table. -// In the index statistics, the variable `num` is always equal to one. -type GlobalStats struct { - Num int - Count int64 - Hg []*statistics.Histogram - Cms []*statistics.CMSketch - TopN []*statistics.TopN -} - -// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableID. -func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, is infoschema.InfoSchema, physicalID int64, isIndex int, idxID int64) (globalStats *GlobalStats, err error) { - // get the partition table IDs - h.mu.Lock() - globalTable, ok := h.getTableByPhysicalID(is, physicalID) - h.mu.Unlock() - if !ok { - err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", physicalID) - return - } - globalTableInfo := globalTable.Meta() - partitionNum := globalTableInfo.Partition.Num - partitionIDs := make([]int64, 0, partitionNum) - for i := uint64(0); i < partitionNum; i++ { - partitionIDs = append(partitionIDs, globalTableInfo.Partition.Definitions[i].ID) - } - - // initialized the globalStats - globalStats = new(GlobalStats) - if isIndex == 0 { - globalStats.Num = len(globalTableInfo.Columns) - } else { - globalStats.Num = 1 - } - globalStats.Count = 0 - globalStats.Hg = make([]*statistics.Histogram, globalStats.Num) - globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num) - globalStats.TopN = make([]*statistics.TopN, globalStats.Num) - - // The first dimension of slice is means the number of column or index stats in the globalStats. - // The second dimension of slice is means the number of partition tables. - // Because all topN and histograms need to be collected before they can be merged. - // So we should store all of the partition-level stats first, and merge them together. - allHg := make([][]*statistics.Histogram, globalStats.Num) - allCms := make([][]*statistics.CMSketch, globalStats.Num) - allTopN := make([][]*statistics.TopN, globalStats.Num) - for i := 0; i < globalStats.Num; i++ { - allHg[i] = make([]*statistics.Histogram, 0, partitionNum) - allCms[i] = make([]*statistics.CMSketch, 0, partitionNum) - allTopN[i] = make([]*statistics.TopN, 0, partitionNum) - } - - for _, partitionID := range partitionIDs { - h.mu.Lock() - partitionTable, ok := h.getTableByPhysicalID(is, partitionID) - h.mu.Unlock() - if !ok { - err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID) - return - } - tableInfo := partitionTable.Meta() - var partitionStats *statistics.Table - partitionStats, err = h.TableStatsFromStorage(tableInfo, partitionID, false, 0) - if err != nil { - return - } - if partitionStats == nil { - err = errors.Errorf("[stats] error occurred when read partition-level stats of the table with tableID %d and partitionID %d", physicalID, partitionID) - return - } - globalStats.Count += partitionStats.Count - for i := 0; i < globalStats.Num; i++ { - ID := tableInfo.Columns[i].ID - if isIndex != 0 { - // If the statistics is the index stats, we should use the index ID to replace the column ID. - ID = idxID - } - hg, cms, topN := partitionStats.GetStatsInfo(ID, isIndex == 1) - allHg[i] = append(allHg[i], hg) - allCms[i] = append(allCms[i], cms) - allTopN[i] = append(allTopN[i], topN) - } - } - - // After collect all of the statistics from the partition-level stats, - // we should merge them together. - for i := 0; i < globalStats.Num; i++ { - // Merge CMSketch - globalStats.Cms[i] = allCms[i][0].Copy() - for j := uint64(1); j < partitionNum; j++ { - err = globalStats.Cms[i].MergeCMSketch(allCms[i][j]) - if err != nil { - return - } - } - - // Merge topN. We need to merge TopN before merging the histogram. - // Because after merging TopN, some numbers will be left. - // These left numbers should be inserted into the histogram. - err = errors.Errorf("TODO: The merge function of the topN structure has not been implemented yet") - if err != nil { - return - } - - // Merge histogram - globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc, allHg[i], 0) - if err != nil { - return - } - - // Merge NDV - err = errors.Errorf("TODO: The merge function of the NDV has not been implemented yet") - if err != nil { - return - } - } - return -} - ->>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652) func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) { if is.SchemaMetaVersion() != h.mu.schemaVersion { h.mu.schemaVersion = is.SchemaMetaVersion() diff --git a/util/testkit/testkit.go b/util/testkit/testkit.go index 2aec7d71aa9dc..e5f76ec4d64dd 100644 --- a/util/testkit/testkit.go +++ b/util/testkit/testkit.go @@ -137,7 +137,7 @@ func (tk *TestKit) GetConnectionID() { } } -// Exec executes a sql statement using the prepared stmt API +// Exec executes a sql statement. func (tk *TestKit) Exec(sql string, args ...interface{}) (sqlexec.RecordSet, error) { var err error if tk.Se == nil { From 8ef9035ae30406f98660812aa030df7f9a91f636 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sun, 7 Mar 2021 19:49:56 -0700 Subject: [PATCH 3/3] Remove useless error check --- session/bootstrap.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/session/bootstrap.go b/session/bootstrap.go index dc10cbe2e3a3d..ace9d791bf5d6 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -500,9 +500,6 @@ func getTiDBVar(s Session, name string) (sVal string, isNull bool, e error) { if err != nil { return "", true, errors.Trace(err) } - if rs == nil { - return "", true, errors.New("Wrong number of Recordset") - } defer terror.Call(rs.Close) req := rs.NewChunk() err = rs.Next(ctx, req)