Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
2891caa
add log restore cmd for cdc log
3pointer Aug 18, 2020
0e345e0
tmp
3pointer Aug 19, 2020
37ff834
move encoder from lightning to br
3pointer Aug 19, 2020
0c2de08
add puller
3pointer Aug 20, 2020
d420b26
implement write and ingest
3pointer Aug 20, 2020
29cc906
define flags
3pointer Aug 24, 2020
b42680d
add log
3pointer Aug 24, 2020
f381b95
fix decoder bug
3pointer Aug 24, 2020
6d48220
fix create ddl bug
3pointer Aug 24, 2020
182c0a4
Merge branch 'master' into log_restore
3pointer Aug 24, 2020
d5de731
adapt walkDir
3pointer Aug 24, 2020
87bf634
fix can't found schema in domain
3pointer Aug 27, 2020
85e0a1e
fix panic
3pointer Aug 28, 2020
92f3021
fix next panic
3pointer Aug 28, 2020
40239ee
fix write kv bug
3pointer Aug 28, 2020
5d962d3
more log
3pointer Aug 31, 2020
5532779
fix write failed
3pointer Aug 31, 2020
d9da188
fix rowID
3pointer Aug 31, 2020
beae903
upate datum
3pointer Aug 31, 2020
c96bc69
fix lost data
3pointer Aug 31, 2020
f047506
fix size
3pointer Aug 31, 2020
0f823c8
format
3pointer Aug 31, 2020
1c7268b
fix check
3pointer Aug 31, 2020
d6bdbcb
fix check
3pointer Aug 31, 2020
2205982
fix Hound
3pointer Aug 31, 2020
7e74044
fix lint
3pointer Sep 1, 2020
d040b90
Merge branch 'master' into log_restore
3pointer Sep 1, 2020
d0ba5da
add decoder test
3pointer Sep 1, 2020
a06b788
fix lint
3pointer Sep 1, 2020
06ca099
make some settings to a config
3pointer Sep 1, 2020
d5593c2
address comment
3pointer Sep 2, 2020
92aaad4
Update pkg/restore/cdclog/decoder.go
3pointer Sep 2, 2020
a57beb5
address comments
3pointer Sep 2, 2020
25fb5e5
fix build
3pointer Sep 2, 2020
4024d21
reload table info in tableBuffer after ddl executed
3pointer Sep 2, 2020
1adeb58
address comments
3pointer Sep 3, 2020
6f68643
remove useless create ddl logic
3pointer Sep 3, 2020
1309566
reset kv encoder
3pointer Sep 3, 2020
eeb4c55
recreate kv encoder
3pointer Sep 3, 2020
6afe56e
add primary table id to do database level ddl
3pointer Sep 4, 2020
9ad5bd9
add allocator to adapt specific table
3pointer Sep 7, 2020
5616667
fix check
3pointer Sep 8, 2020
1828d68
Merge branch 'master' into log_restore
3pointer Sep 8, 2020
bc872b7
adapt file sink
3pointer Sep 8, 2020
a991db1
log restore: add delete flag deleted kv pair
3pointer Sep 9, 2020
8cb0d48
Merge branch 'master' into log_restore
3pointer Sep 10, 2020
652efc1
update kvproto to lastst
3pointer Sep 10, 2020
fffcd5f
address comment
3pointer Sep 14, 2020
1933674
address comment
3pointer Sep 14, 2020
8a05521
add comment for ddl file name
3pointer Sep 14, 2020
432ded5
fix build
3pointer Sep 14, 2020
bf17a1f
address some comments
3pointer Sep 16, 2020
1b6de45
reuse br importClient
3pointer Sep 16, 2020
32801d9
fix check
3pointer Sep 16, 2020
84d0d56
try fix parseQuoteName
3pointer Sep 16, 2020
a5ccc79
use regexp to parse quote name
3pointer Sep 17, 2020
8123cf4
remove Rows interface
3pointer Sep 17, 2020
674d144
Merge branch 'master' into log_restore
3pointer Sep 17, 2020
fa60275
fill the docs
3pointer Sep 17, 2020
1aed603
address comment
3pointer Sep 17, 2020
d832f29
hidden log command
3pointer Sep 17, 2020
cbd2480
update code folder
3pointer Sep 18, 2020
eed7d1c
Merge remote-tracking branch 'origin/master' into log_restore
3pointer Sep 18, 2020
6f0487d
mv common code to kv pkg
3pointer Sep 23, 2020
e3a69c1
Merge branch 'master' into log_restore
3pointer Sep 23, 2020
f73ef64
adapt master code
3pointer Sep 23, 2020
8454191
address comment
3pointer Sep 24, 2020
3378ccf
address comment
3pointer Sep 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ func runRestoreCommand(command *cobra.Command, cmdName string) error {
return nil
}

func runLogRestoreCommand(command *cobra.Command) error {
cfg := task.LogRestoreConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
command.SilenceUsage = false
return err
}
if err := task.RunLogRestore(GetDefaultContext(), tidbGlue, &cfg); err != nil {
log.Error("failed to restore", zap.Error(err))
return err
}
return nil
}

func runRestoreRawCommand(command *cobra.Command, cmdName string) error {
cfg := task.RestoreRawConfig{
RawKvConfig: task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}},
Expand Down Expand Up @@ -80,6 +93,7 @@ func NewRestoreCommand() *cobra.Command {
newFullRestoreCommand(),
newDbRestoreCommand(),
newTableRestoreCommand(),
newLogRestoreCommand(),
newRawRestoreCommand(),
newTiflashReplicaRestoreCommand(),
)
Expand Down Expand Up @@ -124,6 +138,21 @@ func newTableRestoreCommand() *cobra.Command {
return command
}

func newLogRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "cdclog",
Short: "restore data from cdc log backup",
RunE: func(cmd *cobra.Command, _ []string) error {
return runLogRestoreCommand(cmd)
},
}
task.DefineFilterFlags(command)
task.DefineLogRestoreFlags(command)
Comment thread
kennytm marked this conversation as resolved.
// TODO remove hidden if it's ready.
command.Hidden = true
return command
}

func newTiflashReplicaRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "tiflash-replica",
Expand Down
19 changes: 10 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,26 @@ require (
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.1
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200729012136-4e113ddee29e
github.com/pingcap/failpoint v0.0.0-20200603062251-b230c36c413c
github.com/pingcap/kvproto v0.0.0-20200828054126-d677e6fd224a
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4
github.com/pingcap/tidb v1.1.0-beta.0.20200831085451-438945d2948e
github.com/pingcap/errors v0.11.5-0.20200902104258-eba4f1d8f6de
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20200910095337-6b893f12be43
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/parser v0.0.0-20200909072241-6dac7bb703e2
github.com/pingcap/tidb v1.1.0-beta.0.20200910052409-5d52a34b2476
github.com/pingcap/tidb-tools v4.0.5-0.20200820092506-34ea90c93237+incompatible
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/common v0.9.1
github.com/satori/go.uuid v1.2.0
github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/tikv/pd v1.1.0-beta.0.20200818122340-ef1a4e920b2f
github.com/tikv/pd v1.1.0-beta.0.20200820084926-bcfa77a7a593
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.15.0
go.uber.org/zap v1.16.0
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
google.golang.org/api v0.15.1
google.golang.org/grpc v1.26.0
)
51 changes: 44 additions & 7 deletions go.sum

Large diffs are not rendered by default.

213 changes: 213 additions & 0 deletions pkg/cdclog/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cdclog

import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/kv"
)

// TableBuffer represents the kv buffer of this table.
// we restore one tableBuffer in one goroutine.
// this is the concurrent unit of log restore.
type TableBuffer struct {
KvPairs []kv.Row
count int
size int64

KvEncoder kv.Encoder
tableInfo table.Table
allocator autoid.Allocators

flushKVSize int64
flushKVPairs int

colNames []string
colPerm []int
}

func newKVEncoder(allocators autoid.Allocators, tbl table.Table) (kv.Encoder, error) {
encTable, err := table.TableFromMeta(allocators, tbl.Meta())
if err != nil {
return nil, errors.Trace(err)
}
return kv.NewTableKVEncoder(encTable, &kv.SessionOptions{
Timestamp: time.Now().Unix(),
// TODO get the version from TiDB cluster
// currently TiDB only support v1 and v2, and since 4.0
// the default RowFormatVersion is 2, so I think
// we can implement the row version retrieve from cluster in the future
// when TiDB decide to support v3 RowFormatVersion.
RowFormatVersion: "2",
}), nil
}

// NewTableBuffer creates TableBuffer.
func NewTableBuffer(tbl table.Table, allocators autoid.Allocators, flushKVPairs int, flushKVSize int64) *TableBuffer {
tb := &TableBuffer{
KvPairs: make([]kv.Row, 0, flushKVPairs),
flushKVPairs: flushKVPairs,
flushKVSize: flushKVSize,
}
if tbl != nil {
tb.ReloadMeta(tbl, allocators)
}
return tb
}

// TableInfo returns the table info of this buffer.
func (t *TableBuffer) TableInfo() table.Table {
return t.tableInfo
}

// TableID returns the table id of this buffer.
func (t *TableBuffer) TableID() int64 {
if t.tableInfo != nil {
return t.tableInfo.Meta().ID
}
return 0
}

// ReloadMeta reload columns after
// 1. table buffer created.
// 2. every ddl executed.
func (t *TableBuffer) ReloadMeta(tbl table.Table, allocator autoid.Allocators) {
columns := tbl.Meta().Cols()
colNames := make([]string, 0, len(columns))
colPerm := make([]int, 0, len(columns)+1)

for i, col := range columns {
colNames = append(colNames, col.Name.String())
colPerm = append(colPerm, i)
}
if kv.TableHasAutoRowID(tbl.Meta()) {
colPerm = append(colPerm, -1)
}
if t.allocator == nil {
t.allocator = allocator
}
t.tableInfo = tbl
t.colNames = colNames
t.colPerm = colPerm
// reset kv encoder after meta changed
t.KvEncoder = nil
}

func (t *TableBuffer) translateToDatum(row map[string]Column) ([]types.Datum, error) {
cols := make([]types.Datum, 0, len(row))
for _, col := range t.colNames {
val, err := row[col].ToDatum()
if err != nil {
return nil, err
}
log.Debug("translate to datum", zap.String("col", col), zap.Stringer("val", val))
cols = append(cols, val)
}
return cols, nil
}

func (t *TableBuffer) appendRow(
row map[string]Column,
item *SortItem,
encodeFn func(row []types.Datum,
rowID int64,
columnPermutation []int) (kv.Row, int, error),
) error {
cols, err := t.translateToDatum(row)
if err != nil {
return err
}
pair, size, err := encodeFn(cols, item.RowID, t.colPerm)
if err != nil {
return errors.Trace(err)
}
t.KvPairs = append(t.KvPairs, pair)
t.size += int64(size)
t.count++
return nil
}

// Append appends the item to this buffer.
func (t *TableBuffer) Append(item *SortItem) error {
var err error
log.Debug("Append item to buffer",
zap.Stringer("table", t.tableInfo.Meta().Name),
zap.Any("item", item),
)
row := item.Data.(*MessageRow)

if t.KvEncoder == nil {
// lazy create kv encoder
log.Debug("create kv encoder lazily",
zap.Any("alloc", t.allocator), zap.Any("tbl", t.tableInfo))
t.KvEncoder, err = newKVEncoder(t.allocator, t.tableInfo)
if err != nil {
return errors.Trace(err)
}
}

if row.PreColumns != nil {
// remove old keys
log.Debug("process update event", zap.Any("row", row))
err := t.appendRow(row.PreColumns, item, t.KvEncoder.RemoveRecord)
if err != nil {
return err
}
}
if row.Update != nil {
// Add new columns
if row.PreColumns == nil {
log.Debug("process insert event", zap.Any("row", row))
}
err := t.appendRow(row.Update, item, t.KvEncoder.AddRecord)
if err != nil {
return err
}
}
if row.Delete != nil {
// Remove current columns
log.Debug("process delete event", zap.Any("row", row))
err := t.appendRow(row.Delete, item, t.KvEncoder.RemoveRecord)
if err != nil {
return err
}
}
return nil
}

// ShouldApply tells whether we should flush memory kv buffer to storage.
func (t *TableBuffer) ShouldApply() bool {
// flush when reached flush kv len or flush size
return t.size >= t.flushKVSize || t.count >= t.flushKVPairs
}

// IsEmpty tells buffer is empty.
func (t *TableBuffer) IsEmpty() bool {
return t.size == 0
}

// Clear reset the buffer.
func (t *TableBuffer) Clear() {
t.KvPairs = t.KvPairs[:0]
t.count = 0
t.size = 0
}
Loading