From eb475a41d01c86185c18e89133059700750c7884 Mon Sep 17 00:00:00 2001 From: joccau Date: Fri, 2 Dec 2022 12:35:58 +0800 Subject: [PATCH 1/4] debug Signed-off-by: joccau --- br/pkg/stream/rewrite_meta_rawkv.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 40e76a6130358..754a524749959 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -467,6 +467,9 @@ func (sr *SchemasReplace) rewriteValueV2(value []byte, cf string, rewrite func([ shortValue, needWrite, err := rewrite(rawWriteCFValue.GetShortValue()) if err != nil { + log.Info("", zap.ByteString("write-type", []byte{rawWriteCFValue.GetWriteType()}), + zap.Int32("short-value-len", int32(len(rawWriteCFValue.GetShortValue()))), + zap.ByteString("short-value", rawWriteCFValue.GetShortValue())) return rewriteResult{}, errors.Trace(err) } if !needWrite { From 6406973f1e4627fe07d59d0f694f143440865702 Mon Sep 17 00:00:00 2001 From: joccau Date: Fri, 2 Dec 2022 15:42:18 +0800 Subject: [PATCH 2/4] don't rewrite short-value if the value is rollback record Signed-off-by: joccau --- br/pkg/stream/meta_kv.go | 10 ++++++++++ br/pkg/stream/rewrite_meta_rawkv.go | 15 +++++++++++---- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/br/pkg/stream/meta_kv.go b/br/pkg/stream/meta_kv.go index 9d054f0bef454..35d8d0ec73bee 100644 --- a/br/pkg/stream/meta_kv.go +++ b/br/pkg/stream/meta_kv.go @@ -164,6 +164,16 @@ l_for: return nil } +// IsRollback checks whether the value in cf is a `rollback` record. +func (v *RawWriteCFValue) IsRollback() bool { + return v.GetWriteType() == WriteTypeRollback +} + +// IsRollback checks whether the value in cf is a `delete` record. +func (v *RawWriteCFValue) IsDelete() bool { + return v.GetWriteType() == WriteTypeDelete +} + // HasShortValue checks whether short value is stored in write cf. func (v *RawWriteCFValue) HasShortValue() bool { return len(v.shortValue) > 0 diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 754a524749959..7398abdbb2cb9 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -451,13 +451,20 @@ func (sr *SchemasReplace) rewriteValueV2(value []byte, cf string, rewrite func([ return rewriteResult{}, errors.Trace(err) } - if rawWriteCFValue.t == WriteTypeDelete { + if rawWriteCFValue.IsDelete() { return rewriteResult{ NewValue: value, NeedRewrite: true, Deleted: true, }, nil } + if rawWriteCFValue.IsRollback() { + return rewriteResult{ + NewValue: value, + NeedRewrite: true, + Deleted: false, + }, nil + } if !rawWriteCFValue.HasShortValue() { return rewriteResult{ NewValue: value, @@ -467,9 +474,9 @@ func (sr *SchemasReplace) rewriteValueV2(value []byte, cf string, rewrite func([ shortValue, needWrite, err := rewrite(rawWriteCFValue.GetShortValue()) if err != nil { - log.Info("", zap.ByteString("write-type", []byte{rawWriteCFValue.GetWriteType()}), - zap.Int32("short-value-len", int32(len(rawWriteCFValue.GetShortValue()))), - zap.ByteString("short-value", rawWriteCFValue.GetShortValue())) + log.Info("failed to rewrite short value", + zap.ByteString("write-type", []byte{rawWriteCFValue.GetWriteType()}), + zap.Int("short-value-len", len(rawWriteCFValue.GetShortValue()))) return rewriteResult{}, errors.Trace(err) } if !needWrite { From 2ddf3855ef87b62f8149d4e376c3ad70923200fe Mon Sep 17 00:00:00 2001 From: joccau Date: Fri, 2 Dec 2022 16:40:24 +0800 Subject: [PATCH 3/4] add more memeber in write cf valud Signed-off-by: joccau --- br/pkg/stream/meta_kv.go | 43 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/br/pkg/stream/meta_kv.go b/br/pkg/stream/meta_kv.go index 35d8d0ec73bee..265ffd63dee2c 100644 --- a/br/pkg/stream/meta_kv.go +++ b/br/pkg/stream/meta_kv.go @@ -111,6 +111,8 @@ const ( flagShortValuePrefix = byte('v') flagOverlappedRollback = byte('R') flagGCFencePrefix = byte('F') + flagLastChangePrefix = byte('l') + flagTxnSourcePrefix = byte('S') ) type RawWriteCFValue struct { @@ -118,8 +120,18 @@ type RawWriteCFValue struct { startTs uint64 shortValue []byte hasOverlappedRollback bool - hasGCFence bool - gcFence uint64 + // + hasGCFence bool + gcFence uint64 + + // + lastChangeTs uint64 + + // + versionsToLastChange uint64 + + // + txnSource uint64 } // ParseFrom decodes the value to get the struct `RawWriteCFValue`. @@ -146,6 +158,10 @@ l_for: switch data[0] { case flagShortValuePrefix: vlen := data[1] + if len(data[2:]) < int(vlen) { + return errors.Annotatef(berrors.ErrInvalidArgument, + "the length of short value is invalid, vlen: %v", int(vlen)) + } v.shortValue = data[2 : vlen+2] data = data[vlen+2:] case flagOverlappedRollback: @@ -157,6 +173,20 @@ l_for: if err != nil { return errors.Annotate(berrors.ErrInvalidArgument, "decode gc fence failed") } + case flagLastChangePrefix: + data, v.lastChangeTs, err = codec.DecodeUint(data[1:]) + if err != nil { + return errors.Annotate(berrors.ErrInvalidArgument, "decode last change ts failed") + } + data, v.versionsToLastChange, err = codec.DecodeUvarint(data) + if err != nil { + return errors.Annotate(berrors.ErrInvalidArgument, "decode versions to last change failed") + } + case flagTxnSourcePrefix: + data, v.txnSource, err = codec.DecodeUvarint(data) + if err != nil { + return errors.Annotate(berrors.ErrInvalidArgument, "decode txn source failed") + } default: break l_for } @@ -214,5 +244,14 @@ func (v *RawWriteCFValue) EncodeTo() []byte { data = append(data, flagGCFencePrefix) data = codec.EncodeUint(data, v.gcFence) } + if v.lastChangeTs > 0 || v.versionsToLastChange > 0 { + data = append(data, flagLastChangePrefix) + data = codec.EncodeUint(data, v.lastChangeTs) + data = codec.EncodeUvarint(data, v.versionsToLastChange) + } + if v.txnSource > 0 { + data = append(data, flagTxnSourcePrefix) + data = codec.EncodeUvarint(data, v.txnSource) + } return data } From 8f61a6fea89450ddbb7d5f450a23d35881236023 Mon Sep 17 00:00:00 2001 From: joccau Date: Fri, 2 Dec 2022 17:16:34 +0800 Subject: [PATCH 4/4] add test cases Signed-off-by: joccau --- br/pkg/stream/meta_kv.go | 19 +++++-- br/pkg/stream/meta_kv_test.go | 101 ++++++++++++++++++++++++++++++++-- 2 files changed, 109 insertions(+), 11 deletions(-) diff --git a/br/pkg/stream/meta_kv.go b/br/pkg/stream/meta_kv.go index 265ffd63dee2c..fb7c2f79f17d1 100644 --- a/br/pkg/stream/meta_kv.go +++ b/br/pkg/stream/meta_kv.go @@ -115,22 +115,29 @@ const ( flagTxnSourcePrefix = byte('S') ) +// RawWriteCFValue represents the value in write columnFamily. +// Detail see line: https://github.com/tikv/tikv/blob/release-6.5/components/txn_types/src/write.rs#L70 type RawWriteCFValue struct { t WriteType startTs uint64 shortValue []byte hasOverlappedRollback bool + + // Records the next version after this version when overlapping rollback + // happens on an already existed commit record. // + // See [`Write::gc_fence`] for more detail. hasGCFence bool gcFence uint64 - // - lastChangeTs uint64 - - // + // The number of versions that need skipping from this record + // to find the latest PUT/DELETE record. + // If versions_to_last_change > 0 but last_change_ts == 0, the key does not + // have a PUT/DELETE record before this write record. + lastChangeTs uint64 versionsToLastChange uint64 - // + // The source of this txn. txnSource uint64 } @@ -183,7 +190,7 @@ l_for: return errors.Annotate(berrors.ErrInvalidArgument, "decode versions to last change failed") } case flagTxnSourcePrefix: - data, v.txnSource, err = codec.DecodeUvarint(data) + data, v.txnSource, err = codec.DecodeUvarint(data[1:]) if err != nil { return errors.Annotate(berrors.ErrInvalidArgument, "decode txn source failed") } diff --git a/br/pkg/stream/meta_kv_test.go b/br/pkg/stream/meta_kv_test.go index eaebf64526243..7a8c5e4fed8b6 100644 --- a/br/pkg/stream/meta_kv_test.go +++ b/br/pkg/stream/meta_kv_test.go @@ -68,29 +68,49 @@ func TestWriteType(t *testing.T) { } func TestWriteCFValueNoShortValue(t *testing.T) { + var ( + ts uint64 = 400036290571534337 + txnSource uint64 = 9527 + ) + buff := make([]byte, 0, 9) - buff = append(buff, byte('P')) - buff = codec.EncodeUvarint(buff, 400036290571534337) + buff = append(buff, WriteTypePut) + buff = codec.EncodeUvarint(buff, ts) + buff = append(buff, flagTxnSourcePrefix) + buff = codec.EncodeUvarint(buff, txnSource) v := new(RawWriteCFValue) err := v.ParseFrom(buff) require.NoError(t, err) + require.False(t, v.IsDelete()) + require.False(t, v.IsRollback()) require.False(t, v.HasShortValue()) + require.False(t, v.hasGCFence) + require.Equal(t, v.lastChangeTs, uint64(0)) + require.Equal(t, v.versionsToLastChange, uint64(0)) + require.Equal(t, v.txnSource, txnSource) encodedBuff := v.EncodeTo() require.True(t, bytes.Equal(buff, encodedBuff)) } func TestWriteCFValueWithShortValue(t *testing.T) { - var ts uint64 = 400036290571534337 - shortValue := []byte("pingCAP") + var ( + ts uint64 = 400036290571534337 + shortValue = []byte("pingCAP") + lastChangeTs uint64 = 9527 + versionsToLastChange uint64 = 95271 + ) buff := make([]byte, 0, 9) - buff = append(buff, byte('P')) + buff = append(buff, WriteTypePut) buff = codec.EncodeUvarint(buff, ts) buff = append(buff, flagShortValuePrefix) buff = append(buff, byte(len(shortValue))) buff = append(buff, shortValue...) + buff = append(buff, flagLastChangePrefix) + buff = codec.EncodeUint(buff, lastChangeTs) + buff = codec.EncodeUvarint(buff, versionsToLastChange) v := new(RawWriteCFValue) err := v.ParseFrom(buff) @@ -99,7 +119,78 @@ func TestWriteCFValueWithShortValue(t *testing.T) { require.True(t, bytes.Equal(v.GetShortValue(), shortValue)) require.False(t, v.hasGCFence) require.False(t, v.hasOverlappedRollback) + require.Equal(t, v.lastChangeTs, lastChangeTs) + require.Equal(t, v.versionsToLastChange, versionsToLastChange) + require.Equal(t, v.txnSource, uint64(0)) data := v.EncodeTo() require.True(t, bytes.Equal(data, buff)) } + +func TestWriteCFValueWithRollback(t *testing.T) { + var ( + ts uint64 = 400036290571534337 + protectedRollbackShortValue = []byte{'P'} + ) + + buff := make([]byte, 0, 9) + buff = append(buff, WriteTypeRollback) + buff = codec.EncodeUvarint(buff, ts) + buff = append(buff, flagShortValuePrefix, byte(len(protectedRollbackShortValue))) + buff = append(buff, protectedRollbackShortValue...) + + v := new(RawWriteCFValue) + err := v.ParseFrom(buff) + require.NoError(t, err) + require.True(t, v.IsRollback()) + require.True(t, v.HasShortValue()) + require.Equal(t, v.GetShortValue(), protectedRollbackShortValue) + require.Equal(t, v.startTs, ts) + require.Equal(t, v.lastChangeTs, uint64(0)) + require.Equal(t, v.versionsToLastChange, uint64(0)) + require.Equal(t, v.txnSource, uint64(0)) + + data := v.EncodeTo() + require.Equal(t, data, buff) +} + +func TestWriteCFValueWithDelete(t *testing.T) { + var ts uint64 = 400036290571534337 + buff := make([]byte, 0, 9) + buff = append(buff, byte('D')) + buff = codec.EncodeUvarint(buff, ts) + + v := new(RawWriteCFValue) + err := v.ParseFrom(buff) + require.NoError(t, err) + require.True(t, v.IsDelete()) + require.False(t, v.HasShortValue()) + + data := v.EncodeTo() + require.Equal(t, data, buff) +} + +func TestWriteCFValueWithGcFence(t *testing.T) { + var ( + ts uint64 = 400036290571534337 + gcFence uint64 = 9527 + ) + + buff := make([]byte, 0, 9) + buff = append(buff, WriteTypePut) + buff = codec.EncodeUvarint(buff, ts) + buff = append(buff, flagOverlappedRollback) + buff = append(buff, flagGCFencePrefix) + buff = codec.EncodeUint(buff, gcFence) + + v := new(RawWriteCFValue) + err := v.ParseFrom(buff) + require.NoError(t, err) + require.Equal(t, v.startTs, ts) + require.True(t, v.hasGCFence) + require.Equal(t, v.gcFence, gcFence) + require.True(t, v.hasOverlappedRollback) + + data := v.EncodeTo() + require.Equal(t, data, buff) +}