Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ static: prepare tools
@# exhaustivestruct - Protobuf structs have hidden fields, like "XXX_NoUnkeyedLiteral"
@# exhaustive - no need to check exhaustiveness of enum switch statements
@# gosec - too many false positive
@# errorlint - pingcap/errors is incompatible with std errors.
@# wrapcheck - there are too many unwrapped errors in tidb-lightning
CGO_ENABLED=0 tools/bin/golangci-lint run --enable-all --deadline 120s \
--disable gochecknoglobals \
--disable goimports \
Expand All @@ -202,7 +204,9 @@ static: prepare tools
--disable exhaustive \
--disable godot \
--disable gosec \
$$($(PACKAGE_DIRECTORIES) | grep -v "lightning")
--disable errorlint \
--disable wrapcheck \
$(PACKAGE_DIRECTORIES)
# pingcap/errors APIs are mixed with multiple patterns 'pkg/errors',
# 'juju/errors' and 'pingcap/parser'. To avoid confusion and mistake,
# we only allow a subset of APIs, that's "Normalize|Annotate|Trace|Cause".
Expand Down
9 changes: 5 additions & 4 deletions cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func run() error {
// there is a check if `-d` points to a valid storage, and '' is not.
// since tidb-lightning-ctl does not need `-d` we change the default to a valid but harmless value.
dFlag := fs.Lookup("d")
dFlag.Value.Set("noop://")
_ = dFlag.Value.Set("noop://")
dFlag.DefValue = "noop://"

compact = fs.Bool("compact", false, "do manual compaction on the target cluster")
Expand Down Expand Up @@ -247,8 +247,8 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while closing engine:", err)
lastErr = err
} else {
closedEngine.Cleanup(ctx)
} else if err := closedEngine.Cleanup(ctx); err != nil {
lastErr = err
}
}
}
Expand All @@ -263,7 +263,7 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
_, eID := backend.MakeUUID(table.TableName, engineID)
file := local.File{Uuid: eID}
file := local.File{UUID: eID}
err := file.Cleanup(cfg.TikvImporter.SortedKVDir)
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while cleanup engine:", err)
Expand Down Expand Up @@ -321,6 +321,7 @@ func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string)
}

func getLocalStoringTables(ctx context.Context, cfg *config.Config) (err2 error) {
//nolint:prealloc // This is a placeholder.
var tables []string
defer func() {
if err2 == nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/tidb-lightning-ctl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"testing"
)

func TestRunMain(t *testing.T) {
func TestRunMain(_ *testing.T) {
if _, isIntegrationTest := os.LookupEnv("INTEGRATION_TEST"); !isIntegrationTest {
// override exit to pass unit test.
exit = func(code int) {}
Expand Down
11 changes: 5 additions & 6 deletions cmd/tidb-lightning/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,12 @@ func main() {
err = func() error {
if globalCfg.App.ServerMode {
return app.RunServer()
} else {
cfg := config.NewConfig()
if err := cfg.LoadFromGlobal(globalCfg); err != nil {
return err
}
return app.RunOnce(context.Background(), cfg, nil)
}
cfg := config.NewConfig()
if err := cfg.LoadFromGlobal(globalCfg); err != nil {
return err
}
return app.RunOnce(context.Background(), cfg, nil)
}()

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/tidb-lightning/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"testing"
)

func TestRunMain(t *testing.T) {
func TestRunMain(_ *testing.T) {
if _, isIntegrationTest := os.LookupEnv("INTEGRATION_TEST"); !isIntegrationTest {
// override exit to pass unit test.
exit = func(code int) {}
Expand Down
3 changes: 2 additions & 1 deletion pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Test(t *testing.T) {
// FIXME: Cannot use the real SetUpTest/TearDownTest to set up the mock
// otherwise the mock error will be ignored.

func (s *backendSuite) setUpTest(c *C) {
func (s *backendSuite) setUpTest(c gomock.TestReporter) {
s.controller = gomock.NewController(c)
s.mockBackend = mock.NewMockBackend(s.controller)
s.backend = backend.MakeBackend(s.mockBackend)
Expand Down Expand Up @@ -298,6 +298,7 @@ func (s *backendSuite) TestImportFailedRecovered(c *C) {
c.Assert(err, IsNil)
}

//nolint:interfacer // change test case signature causes check panicking.
func (s *backendSuite) TestClose(c *C) {
s.setUpTest(c)
defer s.tearDownTest()
Expand Down
9 changes: 5 additions & 4 deletions pkg/lightning/backend/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ outside:

func (importer *importer) WriteRowsToImporter(
ctx context.Context,
//nolint:interfacer // false positive
engineUUID uuid.UUID,
ts uint64,
rows kv.Rows,
Expand Down Expand Up @@ -326,18 +327,18 @@ func (importer *importer) ResetEngine(context.Context, uuid.UUID) error {
}

func (importer *importer) LocalWriter(ctx context.Context, engineUUID uuid.UUID) (backend.EngineWriter, error) {
return &ImporterWriter{importer: importer, engineUUID: engineUUID}, nil
return &Writer{importer: importer, engineUUID: engineUUID}, nil
}

type ImporterWriter struct {
type Writer struct {
importer *importer
engineUUID uuid.UUID
}

func (w *ImporterWriter) Close() error {
func (w *Writer) Close() error {
return nil
}

func (w *ImporterWriter) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows kv.Rows) error {
func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows kv.Rows) error {
return w.importer.WriteRows(ctx, w.engineUUID, tableName, columnNames, ts, rows)
}
14 changes: 12 additions & 2 deletions pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"

"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/log"

"go.uber.org/zap"
)

// invalidIterator is a trimmed down Iterator type which is invalid.
Expand Down Expand Up @@ -182,11 +185,18 @@ func newSession(options *SessionOptions) *session {
vars.SQLMode = sqlMode
if options.SysVars != nil {
for k, v := range options.SysVars {
vars.SetSystemVar(k, v)
if err := vars.SetSystemVar(k, v); err != nil {
log.L().DPanic("new session: failed to set system var",
log.ShortError(err),
zap.String("key", k))
}
}
}
vars.StmtCtx.TimeZone = vars.Location()
vars.SetSystemVar("timestamp", strconv.FormatInt(options.Timestamp, 10))
if err := vars.SetSystemVar("timestamp", strconv.FormatInt(options.Timestamp, 10)); err != nil {
log.L().Warn("new session: failed to set timestamp",
log.ShortError(err))
}
vars.TxnCtx = nil

s := &session{
Expand Down
47 changes: 29 additions & 18 deletions pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func autoRandomIncrementBits(col *table.Column, randomBits int) int {
incrementalBits := typeBitsLength - randomBits
hasSignBit := !mysql.HasUnsignedFlag(col.Flag)
if hasSignBit {
incrementalBits -= 1
incrementalBits--
}
return incrementalBits
}
Expand Down Expand Up @@ -211,11 +211,13 @@ func (row RowArrayMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) error
return err
}
}
encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error {
if err := encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error {
enc.AddString("kind", kindStr[kind])
enc.AddString("val", log.RedactString(str))
return nil
}))
})); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -277,8 +279,9 @@ func MakeRowFromKvPairs(pairs []common.KvPair) Row {
// KvPairsFromRows converts a Rows instance constructed from MakeRowsFromKvPairs
// back into a slice of KvPair. This method panics if the Rows is not
// constructed in such way.
// nolint:golint // kv.KvPairsFromRows sounds good.
func KvPairsFromRows(rows Rows) []common.KvPair {
return []common.KvPair(rows.(kvPairs))
return rows.(kvPairs)
}

// Encode a row of data into KV pairs.
Expand All @@ -295,6 +298,7 @@ func (kvcodec *tableKVEncoder) Encode(

var value types.Datum
var err error
//nolint:prealloc // This is a placeholder.
var record []types.Datum

if kvcodec.recordCache != nil {
Expand All @@ -308,27 +312,28 @@ func (kvcodec *tableKVEncoder) Encode(
j := columnPermutation[i]
isAutoIncCol := mysql.HasAutoIncrementFlag(col.Flag)
isPk := mysql.HasPriKeyFlag(col.Flag)
if j >= 0 && j < len(row) {
switch {
case j >= 0 && j < len(row):
value, err = table.CastValue(kvcodec.se, row[j], col.ToInfo(), false, false)
if err == nil {
value, err = col.HandleBadNull(value, kvcodec.se.vars.StmtCtx)
}
} else if isAutoIncCol {
case isAutoIncCol:
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
} else if isAutoRandom && isPk {
case isAutoRandom && isPk:
var val types.Datum
if mysql.HasUnsignedFlag(col.Flag) {
val = types.NewUintDatum(uint64(kvcodec.autoRandomHeaderBits | rowID))
} else {
val = types.NewIntDatum(kvcodec.autoRandomHeaderBits | rowID)
}
value, err = table.CastValue(kvcodec.se, val, col.ToInfo(), false, false)
} else if col.IsGenerated() {
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
value = types.GetMinValue(&col.FieldType)
} else {
default:
value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo())
}
if err != nil {
Expand All @@ -339,10 +344,14 @@ func (kvcodec *tableKVEncoder) Encode(

if isAutoRandom && isPk {
incrementalBits := autoRandomIncrementBits(col, int(kvcodec.tbl.Meta().AutoRandomBits))
kvcodec.tbl.RebaseAutoID(kvcodec.se, value.GetInt64()&((1<<incrementalBits)-1), false, autoid.AutoRandomType)
if err := kvcodec.tbl.RebaseAutoID(kvcodec.se, value.GetInt64()&((1<<incrementalBits)-1), false, autoid.AutoRandomType); err != nil {
return nil, errors.Trace(err)
}
}
if isAutoIncCol {
kvcodec.tbl.RebaseAutoID(kvcodec.se, value.GetInt64(), false, autoid.AutoIncrementType)
if err := kvcodec.tbl.RebaseAutoID(kvcodec.se, value.GetInt64(), false, autoid.AutoIncrementType); err != nil {
return nil, errors.Trace(err)
}
}
}

Expand All @@ -357,7 +366,9 @@ func (kvcodec *tableKVEncoder) Encode(
return nil, logKVConvertFailed(logger, row, j, ExtraHandleColumnInfo, err)
}
record = append(record, value)
kvcodec.tbl.RebaseAutoID(kvcodec.se, value.GetInt64(), false, autoid.RowIDAllocType)
if err := kvcodec.tbl.RebaseAutoID(kvcodec.se, value.GetInt64(), false, autoid.RowIDAllocType); err != nil {
return nil, errors.Trace(err)
}
}

if len(kvcodec.genCols) > 0 {
Expand Down Expand Up @@ -414,28 +425,28 @@ func (kvs kvPairs) ClassifyAndAppend(
*indices = indexKVs
}

func (totalKVs kvPairs) SplitIntoChunks(splitSize int) []Rows {
if len(totalKVs) == 0 {
func (kvs kvPairs) SplitIntoChunks(splitSize int) []Rows {
if len(kvs) == 0 {
return nil
}

res := make([]Rows, 0, 1)
i := 0
cumSize := 0

for j, pair := range totalKVs {
for j, pair := range kvs {
size := len(pair.Key) + len(pair.Val)
if i < j && cumSize+size > splitSize {
res = append(res, kvPairs(totalKVs[i:j]))
res = append(res, kvs[i:j])
i = j
cumSize = 0
}
cumSize += size
}

return append(res, kvPairs(totalKVs[i:]))
return append(res, kvs[i:])
}

func (kvs kvPairs) Clear() Rows {
return kvPairs(kvs[:0])
return kvs[:0]
}
9 changes: 5 additions & 4 deletions pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *kvSuite) TestEncode(c *C) {
types.NewIntDatum(1),
types.NewStringDatum("invalid-pk"),
}
pairs, err = strictMode.Encode(logger, rowsWithPk, 2, []int{0, 1})
_, err = strictMode.Encode(logger, rowsWithPk, 2, []int{0, 1})
c.Assert(err, ErrorMatches, "failed to cast value as bigint\\(20\\) for column `_tidb_rowid`.*Truncated.*")

rowsWithPk2 := []types.Datum{
Expand All @@ -117,7 +117,7 @@ func (s *kvSuite) TestEncode(c *C) {
Timestamp: 1234567891,
})
c.Assert(err, IsNil)
pairs, err = mockMode.Encode(logger, rowsWithPk2, 2, []int{0, 1})
_, err = mockMode.Encode(logger, rowsWithPk2, 2, []int{0, 1})
c.Assert(err, ErrorMatches, "mock error")

// Non-strict mode
Expand Down Expand Up @@ -214,9 +214,9 @@ func (s *kvSuite) TestEncodeTimestamp(c *C) {
}))
}

func mockTableInfo(c *C, createSql string) *model.TableInfo {
func mockTableInfo(c *C, createSQL string) *model.TableInfo {
parser := parser.New()
node, err := parser.ParseOneStmt(createSql, "", "")
node, err := parser.ParseOneStmt(createSQL, "", "")
c.Assert(err, IsNil)
sctx := mock.NewContext()
info, err := ddl.MockTableInfo(sctx, node.(*ast.CreateTableStmt), 1)
Expand Down Expand Up @@ -398,6 +398,7 @@ func (s *benchSQL2KVSuite) SetUpTest(c *C) {
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tableInfo)
c.Assert(err, IsNil)
s.encoder, err = NewTableKVEncoder(tbl, &SessionOptions{SysVars: map[string]string{"tidb_row_format_version": "2"}})
c.Assert(err, IsNil)
s.logger = log.Logger{Logger: zap.NewNop()}

// Prepare the row to insert.
Expand Down
Loading