Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"sync"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -32,10 +30,13 @@ import (
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)

const (
Expand All @@ -62,7 +63,15 @@ func NewCapture(pdEndpoints []string) (c *Capture, err error) {
Endpoints: pdEndpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second * 3),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Multiplier: 1.1,
Jitter: 0.1,
MaxDelay: 3 * time.Second,
},
MinConnectTimeout: 3 * time.Second,
}),
},
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions cdc/capture_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package cdc
import (
"context"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/mvcc/mvccpb"
)

var captureEinfoKeyPrefix = kv.EtcdKeyBase + "/capture/info"
Expand Down
6 changes: 3 additions & 3 deletions cdc/capture_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
"sync/atomic"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/mvcc"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/util"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/mvcc"
"golang.org/x/sync/errgroup"
)

Expand Down
61 changes: 52 additions & 9 deletions cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/ticdc/cdc/schema"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/rowcodec"
)

var (
Expand Down Expand Up @@ -180,14 +182,29 @@ func decodeMetaKey(ek []byte) (meta, error) {
}

// decodeRow decodes a byte slice into datums with a existing row map.
func decodeRow(b []byte, recordID int64, tableInfo *schema.TableInfo) (map[int64]types.Datum, error) {
if len(b) == 0 {
if tableInfo.PKIsHandle {
id, pkValue, err := fetchHandleValue(tableInfo, recordID)
if err != nil {
return nil, errors.Trace(err)
}
return map[int64]types.Datum{id: *pkValue}, nil
}
Comment thread
amyangfei marked this conversation as resolved.
return map[int64]types.Datum{}, nil
}
if rowcodec.IsNewFormat(b) {
return decodeRowV2(b, recordID, tableInfo)
}
return decodeRowV1(b, recordID, tableInfo)
}

// decodeRowV1 decodes value data using old encoding format.
// Row layout: colID1, value1, colID2, value2, .....
func decodeRow(b []byte) (map[int64]types.Datum, error) {
func decodeRowV1(b []byte, recordID int64, tableInfo *schema.TableInfo) (map[int64]types.Datum, error) {
row := make(map[int64]types.Datum)
if b == nil {
return row, nil
}
if len(b) == 1 && b[0] == codec.NilFlag {
return row, nil
b = b[1:]
}
var err error
var data []byte
Expand All @@ -213,11 +230,39 @@ func decodeRow(b []byte) (map[int64]types.Datum, error) {
return nil, errors.Trace(err)
}

row[id] = v
// unflatten value
colInfo, exist := tableInfo.GetColumnInfo(id)
if !exist {
// can not find column info, ignore this column because the column should be in WRITE ONLY state
continue
}
fieldType := &colInfo.FieldType
datum, err := unflatten(v, fieldType)
if err != nil {
return nil, errors.Trace(err)
}
row[id] = datum
}

if tableInfo.PKIsHandle {
id, pkValue, err := fetchHandleValue(tableInfo, recordID)
if err != nil {
return nil, errors.Trace(err)
}
row[id] = *pkValue
}
return row, nil
}

// decodeRowV2 decodes value data using new encoding format.
// Ref: https://github.com/pingcap/tidb/pull/12634
// https://github.com/pingcap/tidb/blob/master/docs/design/2018-07-19-row-format.md
func decodeRowV2(data []byte, recordID int64, tableInfo *schema.TableInfo) (map[int64]types.Datum, error) {
handleColID, reqCols := tableInfo.GetRowColInfos()
decoder := rowcodec.NewDatumMapDecoder(reqCols, handleColID, time.UTC)
return decoder.DecodeToDatumMap(data, recordID, nil)
}

// unflatten converts a raw datum to a column datum.
func unflatten(datum types.Datum, ft *types.FieldType) (types.Datum, error) {
if datum.IsNull() {
Expand All @@ -233,9 +278,7 @@ func unflatten(datum types.Datum, ft *types.FieldType) (types.Datum, error) {
mysql.TypeString:
return datum, nil
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp:
var t types.Time
t.Type = ft.Tp
t.Fsp = int8(ft.Decimal)
t := types.NewTime(types.ZeroCoreTime, ft.Tp, int8(ft.Decimal))
var err error
err = t.FromPackedUint(datum.GetUint64())
if err != nil {
Expand Down
43 changes: 15 additions & 28 deletions cdc/entry/kv_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,42 +105,29 @@ func isDistinct(index *timodel.IndexInfo, indexValue []types.Datum) bool {
return false
}

func (row *rowKVEntry) unflatten(tableInfo *schema.TableInfo) error {
if tableInfo.ID != row.TableID {
return errors.New("wrong table info in unflatten")
}
for colID, v := range row.Row {
colInfo, exist := tableInfo.GetColumnInfo(colID)
if !exist {
log.Info("can not find column info, ignore this column because the column should be in WRITE ONLY state", zap.Int64("colID", colID), zap.Uint64("ts", row.Ts))
delete(row.Row, colID)
continue
}
fieldType := &colInfo.FieldType
datum, err := unflatten(v, fieldType)
if err != nil {
return errors.Trace(err)
}
row.Row[colID] = datum
}
return nil
}

func unmarshal(raw *model.RawKVEntry) (kvEntry, error) {
func (m *Mounter) unmarshal(raw *model.RawKVEntry) (kvEntry, error) {
switch {
case bytes.HasPrefix(raw.Key, tablePrefix):
return unmarshalTableKVEntry(raw)
return m.unmarshalTableKVEntry(raw)
case bytes.HasPrefix(raw.Key, metaPrefix) && raw.OpType == model.OpTypePut:
return unmarshalMetaKVEntry(raw)
return m.unmarshalMetaKVEntry(raw)
}
return &unknownKVEntry{*raw}, nil
return nil, nil
}

func unmarshalTableKVEntry(raw *model.RawKVEntry) (kvEntry, error) {
func (m *Mounter) unmarshalTableKVEntry(raw *model.RawKVEntry) (kvEntry, error) {
key, tableID, err := decodeTableID(raw.Key)
if err != nil {
return nil, errors.Trace(err)
}
tableInfo, exist := m.schemaStorage.TableByID(tableID)
if !exist {
if m.schemaStorage.IsTruncateTableID(tableID) {
log.Debug("skip the DML of truncated table", zap.Uint64("ts", raw.Ts), zap.Int64("tableID", tableID))
return nil, nil
}
return nil, errors.NotFoundf("table in schema storage, id: %d", tableID)
}
switch {
case bytes.HasPrefix(key, recordPrefix):
key, recordID, err := decodeRecordID(key)
Expand All @@ -150,7 +137,7 @@ func unmarshalTableKVEntry(raw *model.RawKVEntry) (kvEntry, error) {
if len(key) != 0 {
return nil, errors.New("invalid record key")
}
row, err := decodeRow(raw.Value)
row, err := decodeRow(raw.Value, recordID, tableInfo)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -196,7 +183,7 @@ const (
tableMetaPrefix = "Table:"
)

func unmarshalMetaKVEntry(raw *model.RawKVEntry) (kvEntry, error) {
func (m *Mounter) unmarshalMetaKVEntry(raw *model.RawKVEntry) (kvEntry, error) {
meta, err := decodeMetaKey(raw.Key)
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading