Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,11 @@ func TestCheckTableEmpty(t *testing.T) {
require.NoError(t, err)
mock.MatchExpectationsInOrder(false)
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
Expand All @@ -510,13 +510,13 @@ func TestCheckTableEmpty(t *testing.T) {
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
// test auto retry retryable error
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnError(&gmysql.MySQLError{Number: errno.ErrPDServerTimeout})
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
Expand All @@ -532,11 +532,11 @@ func TestCheckTableEmpty(t *testing.T) {
require.NoError(t, err)
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestCheckTableEmpty(t *testing.T) {
require.NoError(t, err)
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
// only need to check the one that is not in checkpoint
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
err = rc.checkTableEmpty(ctx)
require.NoError(t, err)
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/restore/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,12 @@ func (g *TargetInfoGetterImpl) IsTableEmpty(ctx context.Context, schemaName stri
}
var dump int
err = exec.QueryRow(ctx, "check table empty",
fmt.Sprintf("SELECT 1 FROM %s LIMIT 1", common.UniqueTable(schemaName, tableName)),
// Here we use the `USE INDEX()` hint to skip fetch the record from index.
// In Lightning, if previous importing is halted half-way, it is possible that
// the data is partially imported, but the index data has not been imported.
// In this situation, if no hint is added, the SQL executor might fetch the record from index,
// which is empty. This will result in missing check.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's still possible that index data in partially imported.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's still possible that index data in partially imported.

If index data is partially imported, table-empty check will report errors on the second import even if USE INDEX() is not used. So it will be figured out by the current table-empty-check logic.

fmt.Sprintf("SELECT 1 FROM %s USE INDEX() LIMIT 1", common.UniqueTable(schemaName, tableName)),
&dump,
)

Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/restore/get_pre_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NoError(t, err)
require.Equal(t, lnConfig, targetGetter.cfg)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnError(&mysql_sql_driver.MySQLError{
Number: errno.ErrNoSuchTable,
Message: "Table 'test_db.test_tbl' doesn't exist",
Expand All @@ -772,7 +772,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NotNil(t, pIsEmpty)
require.Equal(t, true, *pIsEmpty)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(
sqlmock.NewRows([]string{"1"}).
RowError(0, sql.ErrNoRows),
Expand All @@ -782,7 +782,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NotNil(t, pIsEmpty)
require.Equal(t, true, *pIsEmpty)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(
sqlmock.NewRows([]string{"1"}).AddRow(1),
)
Expand All @@ -791,7 +791,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NotNil(t, pIsEmpty)
require.Equal(t, false, *pIsEmpty)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnError(errors.New("some dummy error"))
_, err = targetGetter.IsTableEmpty(ctx, "test_db", "test_tbl")
require.Error(t, err)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
if cp.Status < checkpoints.CheckpointStatusIndexImported {
var err error
if indexEngineCp.Status < checkpoints.CheckpointStatusImported {
failpoint.Inject("FailBeforeStartImportingIndexEngine", func() {
errMsg := "fail before importing index KV data"
tr.logger.Warn(errMsg)
failpoint.Return(errors.New(errMsg))
})
err = tr.importKV(ctx, closedIndexEngine, rc, indexEngineID)
failpoint.Inject("FailBeforeIndexEngineImported", func() {
finished := rc.status.FinishedFileSize.Load()
Expand Down
5 changes: 5 additions & 0 deletions br/tests/lightning_check_partial_imported/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[tikv-importer]
backend = "local"

[mydumper.csv]
header = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE tbl01 (
`id` INTEGER,
`val` VARCHAR(64),
`aaa` CHAR(66) DEFAULT NULL,
`bbb` CHAR(10) NOT NULL,
`ccc` CHAR(42) DEFAULT NULL,
`ddd` CHAR(42) DEFAULT NULL,
`eee` CHAR(66) DEFAULT NULL,
`fff` VARCHAR(128) DEFAULT NULL,
KEY `aaa` (`aaa`),
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
6 changes: 6 additions & 0 deletions br/tests/lightning_check_partial_imported/data/db01.tbl01.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,val,aaa,bbb,ccc,ddd,eee,fff
1,"v01","a01","b01","c01","d01","e01","f01"
2,"v02","a02","b02","c02","d02","e02","f02"
3,"v03","a03","b03","c03","d03","e03","f03"
4,"v04","a04","b04","c04","d04","e04","f04"
5,"v05","a05","b05","c05","d05","e05","f05"
47 changes: 47 additions & 0 deletions br/tests/lightning_check_partial_imported/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/bash
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

MYDIR=$(dirname "${BASH_SOURCE[0]}")
set -eux

check_cluster_version 4 0 0 'local backend' || exit 0

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/FailBeforeStartImportingIndexEngine=return"
set +e
if run_lightning; then
echo "The first import doesn't fail as expected" >&2
exit 1
fi
set -e

data_records=$(tail -n +2 "${MYDIR}/data/db01.tbl01.csv" | wc -l | xargs echo )
run_sql "SELECT COUNT(*) FROM db01.tbl01 USE INDEX();"
check_contains "${data_records}"

export GO_FAILPOINTS=""
set +e
if run_lightning --check-requirements=1; then
echo "The pre-check doesn't find out the non-empty table problem"
exit 2
fi
set -e

run_sql "TRUNCATE TABLE db01.tbl01;"
run_lightning --check-requirements=1
run_sql "SELECT COUNT(*) FROM db01.tbl01;"
check_contains "${data_records}"
run_sql "SELECT COUNT(*) FROM db01.tbl01 USE INDEX();"
check_contains "${data_records}"