Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
PROTOC ?= $(shell which protoc)
PROTOS := $(shell find $(shell pwd) -type f -name '*.proto' -print)
CWD := $(shell pwd)
TOOLS := $(CWD)/tools/bin
PACKAGES := go list ./... | grep -vE 'vendor|test|proto|diff|bin|fuzz'
PACKAGES := go list ./... | grep -vE 'vendor|tools'
COVERED_PACKAGES := $(PACKAGES) | grep -vE 'mock|tests|checkpointspb'
PACKAGE_DIRECTORIES := $(PACKAGES) | sed 's/github.com\/pingcap\/br\/*//'
PACKAGE_FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor|\.pb\.go|lightning/mock|res_vfsdata')
PACKAGE_FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor|\.pb\.go|mock|tools|res_vfsdata')
CHECKER := awk '{ print } END { if (NR > 0) { exit 1 } }'

BR_PKG := github.com/pingcap/br
Expand Down Expand Up @@ -44,10 +44,10 @@ GOTEST := CGO_ENABLED=1 GO111MODULE=on go test -ldflags '$(LDFLAGS)'
PREPARE_MOD := cp go.mod1 go.mod && cp go.sum1 go.sum
FINISH_MOD := cp go.mod go.mod1 && cp go.sum go.sum1

RACE_FLAG =
RACEFLAG =
ifeq ("$(WITH_RACE)", "1")
RACE_FLAG = -race
GOBUILD = CGO_ENABLED=1 GO111MODULE=on $(GO) build -ldflags '$(LDFLAGS)'
RACEFLAG = -race
GOBUILD = CGO_ENABLED=1 GO111MODULE=on $(GO) build -ldflags '$(LDFLAGS)'
endif

all: build check test
Expand All @@ -64,7 +64,7 @@ finish-prepare:
@rm tmp_parser.go

data_parsers: tools pkg/lightning/mydump/parser_generated.go web
PATH="$(GOPATH)/bin":"$(PATH)":"$(TOOLS)" protoc -I. -I"$(GOPATH)/src" pkg/lightning/checkpoints/file_checkpoints.proto --gogofaster_out=.
PATH="$(GOPATH)/bin":"$(PATH)":"$(TOOLS)" protoc -I. -I"$(GOPATH)/src" pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto --gogofaster_out=.
$(TOOLS)/vfsgendev -source='"github.com/pingcap/br/pkg/lightning/web".Res' && mv res_vfsdata.go pkg/lightning/web/

web:
Expand All @@ -78,15 +78,15 @@ br:

lightning_for_web:
$(PREPARE_MOD)
$(GOBUILD) $(RACE_FLAG) -tags dev -o $(LIGHTNING_BIN) cmd/tidb-lightning/main.go
$(GOBUILD) $(RACEFLAG) -tags dev -o $(LIGHTNING_BIN) cmd/tidb-lightning/main.go

lightning:
$(PREPARE_MOD)
$(GOBUILD) $(RACE_FLAG) -o $(LIGHTNING_BIN) cmd/tidb-lightning/main.go
$(GOBUILD) $(RACEFLAG) -o $(LIGHTNING_BIN) cmd/tidb-lightning/main.go

lightning-ctl:
$(PREPARE_MOD)
$(GOBUILD) $(RACE_FLAG) -o $(LIGHTNING_CTL_BIN) cmd/tidb-lightning-ctl/main.go
$(GOBUILD) $(RACEFLAG) -o $(LIGHTNING_CTL_BIN) cmd/tidb-lightning-ctl/main.go

build_for_integration_test:
$(PREPARE_MOD)
Expand All @@ -107,7 +107,7 @@ build_for_integration_test:
$(GOBUILD) $(RACEFLAG) -o bin/gc tests/br_z_gc_safepoint/*.go && \
$(GOBUILD) $(RACEFLAG) -o bin/oauth tests/br_gcs/*.go && \
$(GOBUILD) $(RACEFLAG) -o bin/rawkv tests/br_rawkv/*.go && \
$(GOBUILD) $(RACE_FLAG) -o bin/parquet_gen tests/lightning_checkpoint_parquet/*.go \
$(GOBUILD) $(RACEFLAG) -o bin/parquet_gen tests/lightning_checkpoint_parquet/*.go \
) || (make failpoint-disable && exit 1)
@make failpoint-disable

Expand All @@ -121,7 +121,8 @@ testcover: tools
mkdir -p "$(TEST_DIR)"
$(PREPARE_MOD)
@make failpoint-enable
$(GOTEST) -cover -covermode=count -coverprofile="$(TEST_DIR)/cov.unit.out" $$($(PACKAGES)) || ( make failpoint-disable && exit 1 )
$(GOTEST) -cover -covermode=count -coverprofile="$(TEST_DIR)/cov.unit.out" \
$$($(COVERED_PACKAGES)) || ( make failpoint-disable && exit 1 )
@make failpoint-disable

integration_test: bins build build_for_integration_test
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/pingcap/parser/mysql"

kv "github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/mock"
"github.com/pingcap/br/pkg/mock"
)

type backendSuite struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/backend/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

kv "github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/mock"
"github.com/pingcap/br/pkg/mock"
)

type importerSuite struct {
Expand Down
31 changes: 16 additions & 15 deletions pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.uber.org/zap"
"modernc.org/mathutil"

"github.com/pingcap/br/pkg/lightning/checkpoints/checkpointspb"
"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/config"
"github.com/pingcap/br/pkg/lightning/log"
Expand Down Expand Up @@ -901,16 +902,16 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi

type FileCheckpointsDB struct {
lock sync.Mutex // we need to ensure only a thread can access to `checkpoints` at a time
checkpoints CheckpointsModel
checkpoints checkpointspb.CheckpointsModel
path string
}

func NewFileCheckpointsDB(path string) *FileCheckpointsDB {
cpdb := &FileCheckpointsDB{
path: path,
checkpoints: CheckpointsModel{
TaskCheckpoint: &TaskCheckpointModel{},
Checkpoints: map[string]*TableCheckpointModel{},
checkpoints: checkpointspb.CheckpointsModel{
TaskCheckpoint: &checkpointspb.TaskCheckpointModel{},
Checkpoints: map[string]*checkpointspb.TableCheckpointModel{},
},
}
// ignore all errors -- file maybe not created yet (and it is fine).
Expand All @@ -923,15 +924,15 @@ func NewFileCheckpointsDB(path string) *FileCheckpointsDB {
// FIXME: patch for empty map may need initialize manually, because currently
// FIXME: a map of zero size -> marshall -> unmarshall -> become nil, see checkpoint_test.go
if cpdb.checkpoints.Checkpoints == nil {
cpdb.checkpoints.Checkpoints = map[string]*TableCheckpointModel{}
cpdb.checkpoints.Checkpoints = map[string]*checkpointspb.TableCheckpointModel{}
}
for _, table := range cpdb.checkpoints.Checkpoints {
if table.Engines == nil {
table.Engines = map[int32]*EngineCheckpointModel{}
table.Engines = map[int32]*checkpointspb.EngineCheckpointModel{}
}
for _, engine := range table.Engines {
if engine.Chunks == nil {
engine.Chunks = map[string]*ChunkCheckpointModel{}
engine.Chunks = map[string]*checkpointspb.ChunkCheckpointModel{}
}
}
}
Expand Down Expand Up @@ -959,7 +960,7 @@ func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Confi
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

cpdb.checkpoints.TaskCheckpoint = &TaskCheckpointModel{
cpdb.checkpoints.TaskCheckpoint = &checkpointspb.TaskCheckpointModel{
TaskId: cfg.TaskID,
SourceDir: cfg.Mydumper.SourceDir,
Backend: cfg.TikvImporter.Backend,
Expand All @@ -972,16 +973,16 @@ func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Confi
}

if cpdb.checkpoints.Checkpoints == nil {
cpdb.checkpoints.Checkpoints = make(map[string]*TableCheckpointModel)
cpdb.checkpoints.Checkpoints = make(map[string]*checkpointspb.TableCheckpointModel)
}

for _, db := range dbInfo {
for _, table := range db.Tables {
tableName := common.UniqueTable(db.Name, table.Name)
if _, ok := cpdb.checkpoints.Checkpoints[tableName]; !ok {
cpdb.checkpoints.Checkpoints[tableName] = &TableCheckpointModel{
cpdb.checkpoints.Checkpoints[tableName] = &checkpointspb.TableCheckpointModel{
Status: uint32(CheckpointStatusLoaded),
Engines: map[int32]*EngineCheckpointModel{},
Engines: map[int32]*checkpointspb.EngineCheckpointModel{},
TableID: table.ID,
}
}
Expand Down Expand Up @@ -1025,7 +1026,7 @@ func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableC

tableModel, ok := cpdb.checkpoints.Checkpoints[tableName]
if !ok {
tableModel = &TableCheckpointModel{}
tableModel = &checkpointspb.TableCheckpointModel{}
}

cp := &TableCheckpoint{
Expand Down Expand Up @@ -1086,15 +1087,15 @@ func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableN

tableModel := cpdb.checkpoints.Checkpoints[tableName]
for engineID, engine := range checkpoints {
engineModel := &EngineCheckpointModel{
engineModel := &checkpointspb.EngineCheckpointModel{
Status: uint32(CheckpointStatusLoaded),
Chunks: make(map[string]*ChunkCheckpointModel),
Chunks: make(map[string]*checkpointspb.ChunkCheckpointModel),
}
for _, value := range engine.Chunks {
key := value.Key.String()
chunk, ok := engineModel.Chunks[key]
if !ok {
chunk = &ChunkCheckpointModel{
chunk = &checkpointspb.ChunkCheckpointModel{
Path: value.Key.Path,
Offset: value.Key.Offset,
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/lightning/checkpoints/checkpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

. "github.com/pingcap/check"

"github.com/pingcap/br/pkg/lightning/checkpoints/checkpointspb"
"github.com/pingcap/br/pkg/lightning/mydump"
"github.com/pingcap/br/pkg/lightning/verification"
)
Expand Down Expand Up @@ -294,9 +295,9 @@ func (s *checkpointSuite) TestApplyDiff(c *C) {
func (s *checkpointSuite) TestCheckpointMarshallUnmarshall(c *C) {
path := filepath.Join(c.MkDir(), "filecheckpoint")
fileChkp := NewFileCheckpointsDB(path)
fileChkp.checkpoints.Checkpoints["a"] = &TableCheckpointModel{
fileChkp.checkpoints.Checkpoints["a"] = &checkpointspb.TableCheckpointModel{
Status: uint32(CheckpointStatusLoaded),
Engines: map[int32]*EngineCheckpointModel{},
Engines: map[int32]*checkpointspb.EngineCheckpointModel{},
}
fileChkp.Close()

Expand Down
Loading