From a88b1cf824f47501bb0c8b6009818b0fc1203240 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 23 Jan 2025 10:50:57 +0800 Subject: [PATCH 1/5] test: fix gc_safepoint Signed-off-by: dongmen <414110582@qq.com> --- tests/integration_tests/gc_safepoint/run.sh | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/integration_tests/gc_safepoint/run.sh b/tests/integration_tests/gc_safepoint/run.sh index 66139279d8..959f37a2b9 100755 --- a/tests/integration_tests/gc_safepoint/run.sh +++ b/tests/integration_tests/gc_safepoint/run.sh @@ -1,5 +1,15 @@ #!/bin/bash +# This integration test verifies the GC (Garbage Collection) safepoint behavior when TiCDC is running in a cluster. +# It tests: +# 1. Safepoint advances normally when changefeeds are running +# 2. Safepoint stops advancing when all changefeeds are paused +# 3. Safepoint resumes advancing when paused changefeeds are resumed +# 4. Safepoint remains static when there's at least one paused changefeed, even if new changefeeds are created +# 5. Safepoint advances again after removing paused changefeeds +# 6. Safepoint is cleared when all changefeeds are removed +# The test supports multiple sink types (kafka, storage, pulsar, mysql) and includes data sync verification. + set -eu CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) @@ -79,7 +89,8 @@ function run() { ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac - export GO_FAILPOINTS='github.com/pingcap/tiflow/pkg/txnutil/gc/InjectGcSafepointUpdateInterval=return(500)' + # set gc safepoint update interval to 500ms to speed up the test, the default is 1 minute. + export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/txnutil/gc/InjectGcSafepointUpdateInterval=return(500)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') case $SINK_TYPE in From ea5689a91c0bade73f069d98b31b32c0834f100a Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 23 Jan 2025 11:12:35 +0800 Subject: [PATCH 2/5] test: fix gc_safepoint 2 Signed-off-by: dongmen <414110582@qq.com> --- coordinator/coordinator.go | 7 ++++++- logservice/schemastore/persist_storage.go | 4 ---- logservice/schemastore/schema_store.go | 4 ---- pkg/txnutil/gc/gc_manager.go | 3 +++ tests/integration_tests/gc_safepoint/run.sh | 5 +++-- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 1190ccf5a1..67b25b86c9 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -46,6 +46,8 @@ var ( metricsDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("coordinator") ) +var updateGCTickerInterval = 1 * time.Minute + // coordinator implements the Coordinator interface type coordinator struct { nodeInfo *node.Info @@ -142,7 +144,10 @@ func (c *coordinator) recvMessages(_ context.Context, msg *messaging.TargetMessa // - if a node is removed, clean related state machine that bind to that node. // 3. Schedule changefeeds if all node is bootstrapped. func (c *coordinator) Run(ctx context.Context) error { - gcTick := time.NewTicker(time.Minute) + failpoint.Inject("InjectUpdateGCTickerInterval", func(val failpoint.Value) { + updateGCTickerInterval = time.Duration(val.(int) * int(time.Millisecond)) + }) + gcTick := time.NewTicker(updateGCTickerInterval) ctx, cancel := context.WithCancel(ctx) c.cancel = cancel defer gcTick.Stop() diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 68be7f7f5d..677dcd10d8 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -421,10 +421,6 @@ func (p *persistentStorage) fetchTableTriggerDDLEvents(tableFilter filter.Filter for { allTargetTs := make([]uint64, 0, limit) p.mu.RLock() - // log.Debug("fetchTableTriggerDDLEvents in persistentStorage", - // zap.Any("start", start), - // zap.Int("limit", limit), - // zap.Any("tableTriggerDDLHistory", p.tableTriggerDDLHistory)) index := sort.Search(len(p.tableTriggerDDLHistory), func(i int) bool { return p.tableTriggerDDLHistory[i] > nextStartTs }) diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index a9ad124591..832ff3371c 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -290,10 +290,6 @@ func (s *schemaStore) FetchTableTriggerDDLEvents(tableFilter filter.Filter, star if limit == 0 { log.Panic("limit cannot be 0") } - // TODO: remove the following log - log.Debug("FetchTableTriggerDDLEvents", - zap.Uint64("start", start), - zap.Int("limit", limit)) // must get resolved ts first currentResolvedTs := s.resolvedTs.Load() if currentResolvedTs <= start { diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index aa2262434b..7478ec8f75 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -90,6 +90,9 @@ func (m *gcManager) TryUpdateGCSafePoint( failpoint.Inject("InjectActualGCSafePoint", func(val failpoint.Value) { actual = uint64(val.(int)) }) + + log.Debug("update gc safe point", zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("actual", actual)) + if actual == checkpointTs { log.Info("update gc safe point success", zap.Uint64("gcSafePointTs", checkpointTs)) } diff --git a/tests/integration_tests/gc_safepoint/run.sh b/tests/integration_tests/gc_safepoint/run.sh index 959f37a2b9..58f86c6f41 100755 --- a/tests/integration_tests/gc_safepoint/run.sh +++ b/tests/integration_tests/gc_safepoint/run.sh @@ -17,7 +17,7 @@ source $CUR/../_utils/test_prepare WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 -MAX_RETRIES=10 +MAX_RETRIES=30 function get_safepoint() { pd_addr=$1 @@ -90,7 +90,8 @@ function run() { *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac # set gc safepoint update interval to 500ms to speed up the test, the default is 1 minute. - export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/txnutil/gc/InjectGcSafepointUpdateInterval=return(500)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/txnutil/gc/InjectGcSafepointUpdateInterval=return(100)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/coordinator/InjectUpdateGCTickerInterval=return(100)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') case $SINK_TYPE in From 7bcbc669b55666a8e3921d00dffb2902c5d1f2f6 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 23 Jan 2025 12:00:10 +0800 Subject: [PATCH 3/5] test: add debug log Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer_controller.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index a4359082ac..8d03bc83df 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -52,6 +52,7 @@ type Controller struct { messageCenter messaging.MessageCenter nodeManager *watcher.NodeManager tsoClient replica.TSOClient + pdAPIClient pdutil.PDAPIClient splitter *split.Splitter enableTableAcrossNodes bool @@ -67,7 +68,7 @@ type Controller struct { func NewController(changefeedID common.ChangeFeedID, checkpointTs uint64, - pdapi pdutil.PDAPIClient, + pdAPIClient pdutil.PDAPIClient, tsoClient replica.TSOClient, regionCache split.RegionCache, taskScheduler threadpool.ThreadPool, @@ -80,7 +81,7 @@ func NewController(changefeedID common.ChangeFeedID, var splitter *split.Splitter if cfConfig != nil && cfConfig.Scheduler.EnableTableAcrossNodes { enableTableAcrossNodes = true - splitter = split.NewSplitter(changefeedID, pdapi, regionCache, cfConfig.Scheduler) + splitter = split.NewSplitter(changefeedID, pdAPIClient, regionCache, cfConfig.Scheduler) } replicaSetDB := replica.NewReplicaSetDB(changefeedID, ddlSpan, enableTableAcrossNodes) nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) @@ -97,13 +98,30 @@ func NewController(changefeedID common.ChangeFeedID, taskScheduler: taskScheduler, cfConfig: cfConfig, tsoClient: tsoClient, + pdAPIClient: pdAPIClient, splitter: splitter, enableTableAcrossNodes: enableTableAcrossNodes, } s.schedulerController = NewScheduleController(changefeedID, batchSize, oc, replicaSetDB, nodeManager, balanceInterval, s.splitter) + go s.fizz() return s } +// fizz is only for test, remove it after test +func (c *Controller) fizz() { + log.Info("start fuzz") + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for range ticker.C { + serviceGCSafepoint, err := c.pdAPIClient.ListGcServiceSafePoint(context.Background()) + if err != nil { + log.Error("failed to get gc safepoint", zap.Error(err)) + continue + } + log.Info("service gc safepoint", zap.Any("serviceGCSafepoint", serviceGCSafepoint)) + } +} + // HandleStatus handle the status report from the node func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) { for _, status := range statusList { From aa784f0e14443df5de924533dafc74b6901ca0d6 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 23 Jan 2025 15:46:36 +0800 Subject: [PATCH 4/5] *: remove changefeed gc safepoint when it is bootstrapped Signed-off-by: dongmen <414110582@qq.com> --- api/v2/changefeed.go | 11 +- coordinator/changefeed/changefeed.go | 9 + coordinator/controller.go | 9 +- coordinator/coordinator.go | 25 +- heartbeatpb/heartbeat.pb.go | 289 +++++++++++++--------- heartbeatpb/heartbeat.proto | 1 + logservice/schemastore/persist_storage.go | 3 +- maintainer/maintainer.go | 48 ++-- maintainer/maintainer_controller.go | 16 -- maintainer/maintainer_manager.go | 24 +- maintainer/maintainer_manager_test.go | 2 +- pkg/txnutil/gc/gc_manager.go | 5 +- pkg/txnutil/gc/gc_service.go | 32 ++- server/module_election.go | 2 +- 14 files changed, 289 insertions(+), 187 deletions(-) diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index ffb42ed042..120ac64609 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -110,10 +110,12 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { } // Ensure the start ts is valid in the next 3600 seconds, aka 1 hour const ensureTTL = 60 * 60 + createGcServiceID := h.server.GetEtcdClient().GetGCServiceID() if err = gc.EnsureChangefeedStartTsSafety( ctx, h.server.GetPdClient(), - h.server.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceCreating), + createGcServiceID, + gc.EnsureGCServiceCreating, changefeedID, ensureTTL, cfg.StartTs); err != nil { if !errors.ErrStartTsBeforeGC.Equal(err) { @@ -176,7 +178,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { err := gc.UndoEnsureChangefeedStartTsSafety( ctx, pdClient, - h.server.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceCreating), + createGcServiceID, changefeedID, ) if err != nil { @@ -200,7 +202,9 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { log.Info("Create changefeed successfully!", zap.String("id", info.ChangefeedID.Name()), - zap.String("changefeed", info.String())) + zap.String("state", string(info.State)), + zap.String("changefeedInfo", info.String())) + c.JSON(http.StatusOK, toAPIModel( info, &config.ChangeFeedStatus{ @@ -657,6 +661,7 @@ func verifyResumeChangefeedConfig( ctx, pdClient, gcServiceID, + gc.EnsureGCServiceResuming, changefeedID, gcTTL, overrideCheckpointTs) if err != nil { diff --git a/coordinator/changefeed/changefeed.go b/coordinator/changefeed/changefeed.go index 6bea723802..efe2d3bc2b 100644 --- a/coordinator/changefeed/changefeed.go +++ b/coordinator/changefeed/changefeed.go @@ -92,6 +92,7 @@ func NewChangefeed(cfID common.ChangeFeedID, log.Info("changefeed instance created", zap.String("id", cfID.String()), zap.Uint64("checkpointTs", checkpointTs), + zap.String("state", string(info.State)), zap.String("info", info.String())) return res } @@ -138,6 +139,13 @@ func (c *Changefeed) UpdateStatus(newStatus *heartbeatpb.MaintainerStatus) (bool if newStatus != nil && newStatus.CheckpointTs >= old.CheckpointTs { c.status.Store(newStatus) + if old.BootstrapDone != newStatus.BootstrapDone { + log.Info("Received changefeed status with bootstrapDone", + zap.String("changefeed", c.ID.String()), + zap.Bool("bootstrapDone", newStatus.BootstrapDone)) + return true, model.StateNormal, nil + } + info := c.GetInfo() // the changefeed reaches the targetTs if info.TargetTs != 0 && newStatus.CheckpointTs >= info.TargetTs { @@ -145,6 +153,7 @@ func (c *Changefeed) UpdateStatus(newStatus *heartbeatpb.MaintainerStatus) (bool } return c.backoff.CheckStatus(newStatus) } + return false, model.StateNormal, nil } diff --git a/coordinator/controller.go b/coordinator/controller.go index d4b100be2d..f4d8444e2a 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -295,12 +295,14 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.Mainta } if nodeID != from { // todo: handle the case that the node id is mismatch - log.Warn("node id not match", + log.Warn("remote changefeed maintainer nodeID mismatch with local record", zap.String("changefeed", cfID.Name()), - zap.Stringer("from", from), - zap.Stringer("node", nodeID)) + zap.Stringer("remoteNodeID", from), + zap.Stringer("localNodeID", nodeID)) continue } + cfs[cfID] = cf + changed, state, err := cf.UpdateStatus(status) if changed { log.Info("changefeed status changed", @@ -322,7 +324,6 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.Mainta err: mErr, } } - cfs[cfID] = cf } select { case c.updatedChangefeedCh <- cfs: diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 67b25b86c9..65ac357270 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -52,6 +52,7 @@ var updateGCTickerInterval = 1 * time.Minute type coordinator struct { nodeInfo *node.Info version int64 + gcServiceID string lastTickTime time.Time controller *Controller @@ -76,7 +77,7 @@ func New(node *node.Info, pdClient pd.Client, pdClock pdutil.Clock, backend changefeed.Backend, - clusterID string, + gcServiceID string, version int64, batchSize int, balanceCheckInterval time.Duration, @@ -85,8 +86,9 @@ func New(node *node.Info, c := &coordinator{ version: version, nodeInfo: node, + gcServiceID: gcServiceID, lastTickTime: time.Now(), - gcManager: gc.NewManager(clusterID, pdClient, pdClock), + gcManager: gc.NewManager(gcServiceID, pdClient, pdClock), eventCh: chann.NewAutoDrainChann[*Event](), pdClient: pdClient, pdClock: pdClock, @@ -224,6 +226,20 @@ func (c *coordinator) handleStateChangedEvent(ctx context.Context, event *Change c.controller.changefeedDB.Resume(event.ChangefeedID, false, false) case model.StateFailed, model.StateFinished: c.controller.operatorController.StopChangefeed(ctx, event.ChangefeedID, false) + case model.StateNormal: + log.Info("changefeed is resumed or created successfully, try to delete its gc safepoint", + zap.String("changefeed", event.ChangefeedID.String())) + // We need to clean its gc safepoint when changefeed is resumed or created + gcServiceID := c.getEnsureGCServiceID(gc.EnsureGCServiceCreating) + err := gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.ChangefeedID) + if err != nil { + log.Warn("failed to delete create changefeed gc safepoint", zap.Error(err)) + } + gcServiceID = c.getEnsureGCServiceID(gc.EnsureGCServiceResuming) + err = gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.ChangefeedID) + if err != nil { + log.Warn("failed to delete resume changefeed gc safepoint", zap.Error(err)) + } default: } return nil @@ -363,3 +379,8 @@ func (c *coordinator) updateGCSafepoint( err := c.gcManager.TryUpdateGCSafePoint(ctx, gcSafepointUpperBound, false) return errors.Trace(err) } + +// GetEnsureGCServiceID return the prefix for the gc service id when changefeed is creating +func (c *coordinator) getEnsureGCServiceID(tag string) string { + return c.gcServiceID + tag +} diff --git a/heartbeatpb/heartbeat.pb.go b/heartbeatpb/heartbeat.pb.go index 9c51a4cf59..65f9c0f2bf 100644 --- a/heartbeatpb/heartbeat.pb.go +++ b/heartbeatpb/heartbeat.pb.go @@ -885,11 +885,12 @@ func (m *MaintainerHeartbeat) GetStatuses() []*MaintainerStatus { } type MaintainerStatus struct { - ChangefeedID *ChangefeedID `protobuf:"bytes,1,opt,name=changefeedID,proto3" json:"changefeedID,omitempty"` - FeedState string `protobuf:"bytes,2,opt,name=feed_state,json=feedState,proto3" json:"feed_state,omitempty"` - State ComponentState `protobuf:"varint,3,opt,name=state,proto3,enum=heartbeatpb.ComponentState" json:"state,omitempty"` - CheckpointTs uint64 `protobuf:"varint,4,opt,name=checkpoint_ts,json=checkpointTs,proto3" json:"checkpoint_ts,omitempty"` - Err []*RunningError `protobuf:"bytes,5,rep,name=err,proto3" json:"err,omitempty"` + ChangefeedID *ChangefeedID `protobuf:"bytes,1,opt,name=changefeedID,proto3" json:"changefeedID,omitempty"` + FeedState string `protobuf:"bytes,2,opt,name=feed_state,json=feedState,proto3" json:"feed_state,omitempty"` + State ComponentState `protobuf:"varint,3,opt,name=state,proto3,enum=heartbeatpb.ComponentState" json:"state,omitempty"` + CheckpointTs uint64 `protobuf:"varint,4,opt,name=checkpoint_ts,json=checkpointTs,proto3" json:"checkpoint_ts,omitempty"` + Err []*RunningError `protobuf:"bytes,5,rep,name=err,proto3" json:"err,omitempty"` + BootstrapDone bool `protobuf:"varint,6,opt,name=bootstrap_done,json=bootstrapDone,proto3" json:"bootstrap_done,omitempty"` } func (m *MaintainerStatus) Reset() { *m = MaintainerStatus{} } @@ -960,6 +961,13 @@ func (m *MaintainerStatus) GetErr() []*RunningError { return nil } +func (m *MaintainerStatus) GetBootstrapDone() bool { + if m != nil { + return m.BootstrapDone + } + return false +} + type CoordinatorBootstrapRequest struct { Version int64 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` } @@ -2369,10 +2377,10 @@ func (m *ChangefeedID) GetLow() uint64 { } func (m *ChangefeedID) GetName() string { - if m == nil { - return "" + if m != nil { + return m.Name } - return m.Name + return "" } func (m *ChangefeedID) GetNamespace() string { @@ -2429,121 +2437,123 @@ func init() { func init() { proto.RegisterFile("heartbeatpb/heartbeat.proto", fileDescriptor_6d584080fdadb670) } var fileDescriptor_6d584080fdadb670 = []byte{ - // 1821 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x19, 0x4d, 0x6f, 0x1b, 0xc7, - 0x55, 0xbb, 0x4b, 0x52, 0xe4, 0xa3, 0x24, 0x6f, 0xc6, 0xb1, 0x4d, 0x5b, 0x36, 0xa3, 0x4c, 0x7b, - 0x60, 0x95, 0x56, 0x86, 0x95, 0x18, 0x69, 0x8b, 0xa6, 0xa9, 0x44, 0xb9, 0x09, 0x21, 0x98, 0x11, - 0x86, 0x2a, 0xdc, 0xf4, 0x42, 0xac, 0x76, 0x47, 0xd4, 0x42, 0xe4, 0xee, 0x7a, 0x67, 0x29, 0xc9, - 0x01, 0x7a, 0xea, 0xb5, 0x87, 0x1e, 0x7b, 0x68, 0x0f, 0xb9, 0xb5, 0xbf, 0xa4, 0x3d, 0xe6, 0xd4, - 0xf6, 0xd0, 0x43, 0x61, 0xa3, 0x7f, 0xa0, 0x40, 0xd1, 0x6b, 0x31, 0xb3, 0x33, 0xfb, 0xc5, 0xa5, - 0x24, 0x43, 0x44, 0x4e, 0x9c, 0x79, 0xf3, 0xbe, 0xe6, 0x7d, 0xcf, 0x12, 0xd6, 0x4f, 0xa8, 0x15, - 0x46, 0x47, 0xd4, 0x8a, 0x82, 0xa3, 0xc7, 0xc9, 0x7a, 0x2b, 0x08, 0xfd, 0xc8, 0x47, 0xcd, 0xcc, - 0x21, 0xfe, 0x12, 0x1a, 0x87, 0xd6, 0xd1, 0x98, 0x0e, 0x02, 0xcb, 0x43, 0x2d, 0x58, 0x16, 0x9b, - 0xde, 0x5e, 0x4b, 0xdb, 0xd0, 0x3a, 0x06, 0x51, 0x5b, 0xf4, 0x00, 0xea, 0x83, 0xc8, 0x0a, 0xa3, - 0x7d, 0xfa, 0xaa, 0xa5, 0x6f, 0x68, 0x9d, 0x15, 0x92, 0xec, 0xd1, 0x5d, 0xa8, 0x3d, 0xf3, 0x1c, - 0x7e, 0x62, 0x88, 0x13, 0xb9, 0xc3, 0xbf, 0xd7, 0xc1, 0xfc, 0x9c, 0x8b, 0xda, 0xa5, 0x56, 0x44, - 0xe8, 0xcb, 0x29, 0x65, 0x11, 0xfa, 0x04, 0x56, 0xec, 0x13, 0xcb, 0x1b, 0xd1, 0x63, 0x4a, 0x1d, - 0x29, 0xa7, 0xb9, 0x7d, 0x7f, 0x2b, 0xa3, 0xd3, 0x56, 0x37, 0x83, 0x40, 0x72, 0xe8, 0xe8, 0x23, - 0x68, 0x9c, 0x5b, 0x11, 0x0d, 0x27, 0x56, 0x78, 0x2a, 0x14, 0x69, 0x6e, 0xdf, 0xcd, 0xd1, 0xbe, - 0x50, 0xa7, 0x24, 0x45, 0x44, 0x3f, 0x84, 0x3a, 0x8b, 0xac, 0x68, 0xca, 0x28, 0x6b, 0x19, 0x1b, - 0x46, 0xa7, 0xb9, 0xfd, 0x30, 0x47, 0x94, 0x58, 0x60, 0x20, 0xb0, 0x48, 0x82, 0x8d, 0x3a, 0x70, - 0xcb, 0xf6, 0x27, 0x01, 0x1d, 0xd3, 0x88, 0xc6, 0x87, 0xad, 0xca, 0x86, 0xd6, 0xa9, 0x93, 0x22, - 0x18, 0x7d, 0x00, 0x06, 0x0d, 0xc3, 0x56, 0xb5, 0xe4, 0x3e, 0x64, 0xea, 0x79, 0xae, 0x37, 0x7a, - 0x16, 0x86, 0x7e, 0x48, 0x38, 0x16, 0xb6, 0xa0, 0x91, 0x28, 0x8a, 0x30, 0x37, 0x09, 0xb5, 0x4f, - 0x03, 0xdf, 0xf5, 0xa2, 0x43, 0x26, 0x4c, 0x52, 0x21, 0x39, 0x18, 0x6a, 0x03, 0x84, 0x94, 0xf9, - 0xe3, 0x33, 0xea, 0x1c, 0x32, 0x71, 0xf1, 0x0a, 0xc9, 0x40, 0x90, 0x09, 0x06, 0xa3, 0x2f, 0x85, - 0x03, 0x2a, 0x84, 0x2f, 0xf1, 0xaf, 0xc1, 0xdc, 0x73, 0x59, 0x60, 0x45, 0xf6, 0x09, 0x0d, 0x77, - 0xec, 0xc8, 0xf5, 0x3d, 0xf4, 0x01, 0xd4, 0x2c, 0xb1, 0x12, 0x32, 0xd6, 0xb6, 0x6f, 0xe7, 0xd4, - 0x8c, 0x91, 0x88, 0x44, 0xe1, 0x2e, 0xef, 0xfa, 0x93, 0x89, 0x1b, 0x25, 0x02, 0x93, 0x3d, 0xda, - 0x80, 0x66, 0x8f, 0x0d, 0x5e, 0x79, 0xf6, 0x01, 0xd7, 0x4f, 0x88, 0xad, 0x93, 0x2c, 0x08, 0x77, - 0xc1, 0xd8, 0xe9, 0xee, 0xe7, 0x98, 0x68, 0x97, 0x33, 0xd1, 0x67, 0x99, 0xfc, 0x46, 0x87, 0x3b, - 0x3d, 0xef, 0x78, 0x3c, 0xa5, 0x9e, 0x4d, 0x9d, 0xf4, 0x3a, 0x0c, 0xfd, 0x0c, 0x56, 0x93, 0x83, - 0xc3, 0x57, 0x01, 0x95, 0x17, 0x7a, 0x90, 0xbb, 0x50, 0x0e, 0x83, 0xe4, 0x09, 0xd0, 0xa7, 0xb0, - 0x9a, 0x32, 0xec, 0xed, 0xf1, 0x3b, 0x1a, 0x33, 0x9e, 0xcb, 0x62, 0x90, 0x3c, 0xbe, 0x48, 0x09, - 0xfb, 0x84, 0x4e, 0xac, 0xde, 0x9e, 0x30, 0x80, 0x41, 0x92, 0x3d, 0xda, 0x87, 0xdb, 0xf4, 0xc2, - 0x1e, 0x4f, 0x1d, 0x9a, 0xa1, 0x71, 0x44, 0xe8, 0x5c, 0x2a, 0xa2, 0x8c, 0x0a, 0xff, 0x45, 0xcb, - 0xba, 0x52, 0x86, 0xdb, 0x2f, 0xe1, 0x8e, 0x5b, 0x66, 0x19, 0x99, 0x50, 0xb8, 0xdc, 0x10, 0x59, - 0x4c, 0x52, 0xce, 0x00, 0x3d, 0x4d, 0x82, 0x24, 0xce, 0xaf, 0x47, 0x73, 0xd4, 0x2d, 0x84, 0x0b, - 0x06, 0xc3, 0xb2, 0x4f, 0x85, 0x25, 0x9a, 0xdb, 0x66, 0x3e, 0xb0, 0xba, 0xfb, 0x84, 0x1f, 0xe2, - 0xaf, 0x35, 0x78, 0x27, 0x53, 0x11, 0x58, 0xe0, 0x7b, 0x8c, 0xde, 0xb4, 0x24, 0x3c, 0x07, 0xe4, - 0x14, 0xac, 0x43, 0x95, 0x37, 0xe7, 0xe9, 0x2e, 0xf3, 0xbc, 0x84, 0x10, 0x5f, 0xc0, 0xed, 0x6e, - 0x26, 0xf3, 0x9e, 0x53, 0xc6, 0xac, 0xd1, 0x8d, 0x95, 0x2c, 0xe6, 0xb8, 0x3e, 0x9b, 0xe3, 0xf8, - 0xef, 0x39, 0x3f, 0x77, 0x7d, 0xef, 0xd8, 0x1d, 0xa1, 0x4d, 0xa8, 0xb0, 0xc0, 0xf2, 0xa4, 0xbc, - 0xbb, 0xe5, 0x65, 0x8b, 0x08, 0x1c, 0x5e, 0xbe, 0x19, 0x2f, 0xca, 0x09, 0x7f, 0xb5, 0xe5, 0xda, - 0x3b, 0x99, 0x38, 0x93, 0x5e, 0xba, 0x24, 0x10, 0x73, 0xe8, 0x3c, 0xd4, 0x99, 0x0a, 0xf5, 0x4a, - 0x1c, 0xea, 0x6a, 0x8f, 0x30, 0xac, 0xda, 0xd3, 0x30, 0xa4, 0x5e, 0x34, 0x0c, 0x9c, 0x61, 0xc4, - 0x44, 0x05, 0xac, 0x90, 0xa6, 0x04, 0x1e, 0x38, 0x87, 0x0c, 0xff, 0x4d, 0x83, 0xfb, 0x3c, 0x37, - 0x9c, 0xe9, 0x38, 0x13, 0xda, 0x0b, 0x6a, 0x09, 0x4f, 0xa1, 0x66, 0x0b, 0x5b, 0x5d, 0x11, 0xaf, - 0xb1, 0x41, 0x89, 0x44, 0x46, 0x5d, 0x58, 0x63, 0x52, 0xa5, 0x38, 0x92, 0x85, 0x51, 0xd6, 0xb6, - 0xd7, 0x73, 0xe4, 0x83, 0x1c, 0x0a, 0x29, 0x90, 0xe0, 0x03, 0xb8, 0xfd, 0xdc, 0x72, 0xbd, 0xc8, - 0x72, 0x3d, 0x1a, 0x7e, 0xae, 0xe8, 0xd0, 0x8f, 0x32, 0xfd, 0x46, 0x2b, 0x09, 0xc4, 0x94, 0xa6, - 0xd8, 0x70, 0xf0, 0x7f, 0x35, 0x30, 0x8b, 0xc7, 0x37, 0xb5, 0xd0, 0x23, 0x00, 0xbe, 0x1a, 0x72, - 0x21, 0x54, 0x58, 0xa9, 0x41, 0x1a, 0x1c, 0xc2, 0xd9, 0x53, 0xf4, 0x04, 0xaa, 0xf1, 0x49, 0x99, - 0x01, 0xba, 0xfe, 0x24, 0xf0, 0x3d, 0xea, 0x45, 0x02, 0x97, 0xc4, 0x98, 0xe8, 0x3b, 0xb0, 0x9a, - 0x86, 0x2e, 0x77, 0x7a, 0xa5, 0xa4, 0x67, 0x25, 0x1d, 0xd1, 0xb8, 0x46, 0x47, 0xfc, 0x18, 0xd6, - 0xbb, 0xbe, 0x1f, 0x3a, 0xae, 0x67, 0x45, 0x7e, 0xb8, 0xeb, 0xfb, 0x11, 0x8b, 0x42, 0x2b, 0x50, - 0x31, 0xd2, 0x82, 0xe5, 0x33, 0x1a, 0x32, 0xd5, 0xba, 0x0c, 0xa2, 0xb6, 0xf8, 0x4b, 0x78, 0x58, - 0x4e, 0x28, 0xab, 0xcb, 0x0d, 0x7c, 0xf1, 0x27, 0x0d, 0xde, 0xdd, 0x71, 0x9c, 0x14, 0x43, 0x69, - 0xf3, 0x3d, 0xd0, 0x5d, 0xe7, 0x6a, 0x2f, 0xe8, 0xae, 0xc3, 0x87, 0xa3, 0x4c, 0x74, 0xae, 0x24, - 0xe1, 0x37, 0x63, 0x41, 0xa3, 0xc4, 0x82, 0x1d, 0x30, 0x5d, 0x36, 0xf4, 0xe8, 0xf9, 0x50, 0xf8, - 0x93, 0xb3, 0x95, 0xe3, 0xc7, 0x9a, 0xcb, 0xfa, 0xf4, 0xbc, 0xab, 0xa0, 0xf8, 0x02, 0xee, 0x11, - 0x3a, 0xf1, 0xcf, 0xe8, 0x8d, 0x94, 0x6d, 0xc1, 0xb2, 0x6d, 0x31, 0xdb, 0x72, 0xa8, 0xec, 0xc6, - 0x6a, 0xcb, 0x4f, 0x42, 0xc1, 0xdf, 0x91, 0xcd, 0x5e, 0x6d, 0xf1, 0x1f, 0x75, 0x78, 0x90, 0x0a, - 0x9d, 0x71, 0xdc, 0x0d, 0x43, 0x77, 0x9e, 0xf9, 0xee, 0x0b, 0xaf, 0x86, 0x19, 0xcb, 0x25, 0xb5, - 0xce, 0x86, 0xf7, 0x23, 0x5e, 0x18, 0x87, 0x51, 0xe8, 0x8e, 0x46, 0x34, 0x1c, 0xd2, 0x33, 0x5e, - 0x9c, 0xd2, 0x82, 0x36, 0x74, 0xaf, 0xd1, 0x89, 0x1f, 0x09, 0x1e, 0x87, 0x31, 0x8b, 0x67, 0x9c, - 0x43, 0xb6, 0x27, 0x97, 0x7a, 0xa6, 0x5a, 0xea, 0x99, 0x7f, 0x6b, 0xb0, 0x5e, 0x6a, 0x9f, 0xc5, - 0x74, 0xbf, 0xa7, 0x50, 0xe5, 0xb5, 0x5f, 0x35, 0xbc, 0xf7, 0x72, 0x74, 0x89, 0xb4, 0xb4, 0x53, - 0xc4, 0xd8, 0x2a, 0x37, 0x8d, 0xeb, 0x4c, 0xab, 0xd7, 0xca, 0x76, 0xfc, 0x3f, 0x0d, 0xda, 0xe9, - 0x3d, 0x0f, 0x7c, 0x16, 0x2d, 0x3a, 0x16, 0xae, 0xe5, 0x58, 0xfd, 0x86, 0x8e, 0x7d, 0x02, 0xcb, - 0x71, 0x6b, 0x53, 0x2f, 0x85, 0x7b, 0x33, 0xfd, 0x60, 0x62, 0xf5, 0xbc, 0x63, 0x9f, 0x28, 0x3c, - 0xfc, 0x1f, 0x0d, 0xde, 0x9b, 0x7b, 0xf3, 0xc5, 0x78, 0xf9, 0x5b, 0xb9, 0xfa, 0xdb, 0xc4, 0x04, - 0xbe, 0x00, 0x48, 0x6d, 0x91, 0x9b, 0x85, 0xb5, 0xc2, 0x2c, 0xdc, 0x56, 0x98, 0x7d, 0x6b, 0xa2, - 0xba, 0x4f, 0x06, 0x82, 0xb6, 0xa0, 0x26, 0xc2, 0x53, 0x19, 0xbc, 0x64, 0xc6, 0x11, 0xf6, 0x96, - 0x58, 0xb8, 0x2b, 0x5f, 0xac, 0x42, 0xf0, 0xfc, 0x17, 0xeb, 0x43, 0x89, 0x96, 0x91, 0x9a, 0x02, - 0xf0, 0x9f, 0x75, 0x40, 0xb3, 0xd9, 0xc1, 0x6b, 0xe5, 0x1c, 0xe7, 0xe4, 0x0c, 0xa9, 0xcb, 0x17, - 0xb1, 0xba, 0xb2, 0x5e, 0xb8, 0xb2, 0x1a, 0xda, 0x8c, 0x6b, 0x0c, 0x6d, 0x3f, 0x07, 0xd3, 0x56, - 0x3d, 0x76, 0xc8, 0xd2, 0x27, 0xe6, 0x15, 0x8d, 0xf8, 0x96, 0x9d, 0xdd, 0x4f, 0xd9, 0x6c, 0x92, - 0x56, 0x4b, 0x1a, 0xca, 0x87, 0xd0, 0x3c, 0x1a, 0xfb, 0xf6, 0xa9, 0x1c, 0x05, 0x6a, 0x42, 0x3f, - 0x94, 0x8f, 0x70, 0xc1, 0x1e, 0x04, 0x9a, 0x58, 0xe3, 0x97, 0x70, 0x37, 0x0d, 0xef, 0xee, 0xd8, - 0x67, 0x74, 0x41, 0x09, 0x9d, 0x69, 0x2a, 0x7a, 0xbe, 0xa9, 0x84, 0x70, 0x6f, 0x46, 0xe4, 0x62, - 0x32, 0x89, 0xcf, 0xc8, 0x53, 0xdb, 0xa6, 0x8c, 0x29, 0x99, 0x72, 0x8b, 0x7f, 0xab, 0x81, 0x99, - 0x3e, 0x94, 0xe2, 0x60, 0x5b, 0xc0, 0x3b, 0xf3, 0x01, 0xd4, 0x65, 0x48, 0xc6, 0x35, 0xda, 0x20, - 0xc9, 0xfe, 0xb2, 0x27, 0x24, 0xfe, 0x04, 0xaa, 0x02, 0xef, 0x8a, 0x8f, 0x32, 0x73, 0x42, 0x10, - 0x7b, 0xb0, 0xa6, 0xd6, 0xb1, 0x35, 0x2e, 0xe1, 0xb3, 0x01, 0xcd, 0x2f, 0xc6, 0x4e, 0x81, 0x55, - 0x16, 0xc4, 0x31, 0xfa, 0xf4, 0xbc, 0xa0, 0x6b, 0x16, 0x84, 0xbf, 0x36, 0xa0, 0x1a, 0x8f, 0x93, - 0x0f, 0xa1, 0xd1, 0x63, 0xbb, 0x3c, 0x7c, 0x68, 0x3c, 0x76, 0xd4, 0x49, 0x0a, 0xe0, 0x5a, 0x88, - 0x65, 0xfa, 0x46, 0x91, 0x5b, 0xf4, 0x29, 0x34, 0xe3, 0xa5, 0x2a, 0x06, 0xb3, 0xc3, 0x7c, 0xd1, - 0x3d, 0x24, 0x4b, 0x81, 0xf6, 0xe1, 0x9d, 0x3e, 0xa5, 0xce, 0x5e, 0xe8, 0x07, 0x81, 0xc2, 0x90, - 0x8d, 0xfe, 0x0a, 0x36, 0xb3, 0x74, 0xe8, 0x27, 0x70, 0x8b, 0x03, 0x77, 0x1c, 0x27, 0x61, 0x15, - 0x0f, 0xb2, 0x68, 0x36, 0x9b, 0x49, 0x11, 0x95, 0x3f, 0x2e, 0x7e, 0x11, 0x38, 0x56, 0x44, 0xa5, - 0x09, 0x59, 0xab, 0x26, 0x88, 0xd7, 0xcb, 0x9a, 0x89, 0x74, 0x10, 0x29, 0x90, 0x14, 0xbf, 0x8f, - 0x2c, 0xcf, 0x7c, 0x1f, 0x41, 0x3f, 0x10, 0x93, 0xfb, 0x88, 0xb6, 0xea, 0x22, 0x2a, 0xf3, 0xad, - 0x6a, 0x57, 0x66, 0xf0, 0x28, 0x9e, 0xda, 0x47, 0x14, 0x9f, 0xc2, 0xbb, 0x49, 0xf5, 0x51, 0xa7, - 0xbc, 0x74, 0xbc, 0x45, 0xd5, 0xeb, 0xa8, 0xb7, 0x82, 0x3e, 0xb7, 0x74, 0xc4, 0x08, 0xf8, 0x9f, - 0x1a, 0xdc, 0x2a, 0x7c, 0x57, 0x7b, 0x1b, 0x41, 0x65, 0x65, 0x51, 0x5f, 0x44, 0x59, 0x2c, 0x9b, - 0xb3, 0x9f, 0xc0, 0x9d, 0xb8, 0xa1, 0x32, 0xf7, 0x2b, 0x3a, 0x0c, 0x68, 0x38, 0x64, 0xd4, 0xf6, - 0xbd, 0x78, 0x4c, 0xd4, 0x09, 0x12, 0x87, 0x03, 0xf7, 0x2b, 0x7a, 0x40, 0xc3, 0x81, 0x38, 0xc1, - 0x7f, 0xd0, 0x00, 0x65, 0x6c, 0xb8, 0xa0, 0x8a, 0xf8, 0x19, 0xac, 0x1e, 0xa5, 0x4c, 0x93, 0xcf, - 0x18, 0xef, 0x97, 0x77, 0x90, 0xac, 0xfc, 0x3c, 0x1d, 0x76, 0x60, 0x25, 0xdb, 0xb3, 0x11, 0x82, - 0x4a, 0xe4, 0x4e, 0xe2, 0xf2, 0xd5, 0x20, 0x62, 0xcd, 0x61, 0x9e, 0xef, 0xa8, 0xe6, 0x28, 0xd6, - 0x1c, 0x66, 0x73, 0x98, 0x11, 0xc3, 0xf8, 0x9a, 0xa7, 0xec, 0x24, 0xfe, 0x0a, 0x22, 0xec, 0xd1, - 0x20, 0x6a, 0x8b, 0x3f, 0x82, 0x95, 0xac, 0xe3, 0x38, 0xf5, 0x89, 0x3b, 0x3a, 0x91, 0x5f, 0xfa, - 0xc4, 0x1a, 0x99, 0x60, 0x8c, 0xfd, 0x73, 0x99, 0xec, 0x7c, 0x89, 0x8f, 0x61, 0x25, 0x6b, 0x82, - 0xeb, 0x51, 0x09, 0x6d, 0x79, 0x2b, 0x97, 0x9a, 0xf1, 0x35, 0x2f, 0x35, 0xfc, 0x97, 0x05, 0x96, - 0xad, 0x74, 0x4b, 0x01, 0x9b, 0x8f, 0xa0, 0x26, 0xbf, 0x7b, 0x36, 0xa0, 0xfa, 0x22, 0x74, 0x23, - 0x6a, 0x2e, 0xa1, 0x3a, 0x54, 0x0e, 0x2c, 0xc6, 0x4c, 0x6d, 0xb3, 0x13, 0x57, 0xc8, 0xf4, 0x35, - 0x8f, 0x00, 0x6a, 0xdd, 0x90, 0x5a, 0x02, 0x0f, 0xa0, 0x16, 0x3f, 0xa8, 0x4c, 0x6d, 0xf3, 0xc7, - 0x00, 0x69, 0x32, 0x71, 0x0e, 0xfd, 0x2f, 0xfa, 0xcf, 0xcc, 0x25, 0xd4, 0x84, 0xe5, 0x17, 0x3b, - 0xbd, 0xc3, 0x5e, 0xff, 0x33, 0x53, 0x13, 0x1b, 0x12, 0x6f, 0x74, 0x8e, 0xb3, 0xc7, 0x71, 0x8c, - 0xcd, 0xef, 0x17, 0x1a, 0x08, 0x5a, 0x06, 0x63, 0x67, 0x3c, 0x36, 0x97, 0x50, 0x0d, 0xf4, 0xbd, - 0x5d, 0x53, 0xe3, 0x92, 0xfa, 0x7e, 0x38, 0xb1, 0xc6, 0xa6, 0xbe, 0xf9, 0x31, 0xac, 0xe5, 0x03, - 0x5a, 0xb0, 0xf5, 0xc3, 0x53, 0xd7, 0x1b, 0xc5, 0x02, 0x07, 0x91, 0xa8, 0x52, 0xb1, 0xc0, 0x58, - 0x43, 0xc7, 0xd4, 0x77, 0x7f, 0xfa, 0xd7, 0xd7, 0x6d, 0xed, 0x9b, 0xd7, 0x6d, 0xed, 0x5f, 0xaf, - 0xdb, 0xda, 0xef, 0xde, 0xb4, 0x97, 0xbe, 0x79, 0xd3, 0x5e, 0xfa, 0xc7, 0x9b, 0xf6, 0xd2, 0xaf, - 0xbe, 0x3b, 0x72, 0xa3, 0x93, 0xe9, 0xd1, 0x96, 0xed, 0x4f, 0x1e, 0x07, 0xae, 0x37, 0xb2, 0xad, - 0xe0, 0x71, 0xe4, 0xda, 0x8e, 0xfd, 0x38, 0x13, 0x53, 0x47, 0x35, 0xf1, 0xd7, 0xc0, 0x87, 0xff, - 0x0f, 0x00, 0x00, 0xff, 0xff, 0x8c, 0xfa, 0xc2, 0x4b, 0x39, 0x18, 0x00, 0x00, + // 1841 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x19, 0x4d, 0x73, 0x1b, 0x49, + 0xd5, 0x33, 0x23, 0xcb, 0xd2, 0x93, 0xed, 0xcc, 0x76, 0x36, 0x89, 0x12, 0x27, 0x5a, 0x6f, 0x03, + 0x55, 0xc2, 0x0b, 0x4e, 0xc5, 0xbb, 0xa9, 0x05, 0x8a, 0x65, 0xb1, 0xa5, 0xb0, 0xab, 0x72, 0x45, + 0xeb, 0x6a, 0x99, 0x0a, 0xcb, 0x45, 0x35, 0x9a, 0x69, 0xcb, 0x53, 0x96, 0x66, 0x26, 0xd3, 0xa3, + 0x38, 0xd9, 0x2a, 0x4e, 0x5c, 0x39, 0x70, 0xe4, 0xb0, 0x1c, 0xf6, 0x06, 0xbf, 0x04, 0x8e, 0x7b, + 0x02, 0x0e, 0x1c, 0xa8, 0xa4, 0xf8, 0x03, 0x5c, 0xb8, 0x52, 0xdd, 0xd3, 0x3d, 0x5f, 0x1a, 0xd9, + 0x4e, 0x59, 0xc5, 0x49, 0xdd, 0xaf, 0xdf, 0x57, 0xbf, 0xef, 0x1e, 0xc1, 0xd6, 0x29, 0xb5, 0xc2, + 0x68, 0x44, 0xad, 0x28, 0x18, 0x3d, 0x4c, 0xd6, 0xbb, 0x41, 0xe8, 0x47, 0x3e, 0x6a, 0x64, 0x0e, + 0xf1, 0x97, 0x50, 0x3f, 0xb6, 0x46, 0x13, 0x3a, 0x08, 0x2c, 0x0f, 0x35, 0x61, 0x4d, 0x6c, 0x7a, + 0xdd, 0xa6, 0xb6, 0xad, 0xb5, 0x0d, 0xa2, 0xb6, 0xe8, 0x1e, 0xd4, 0x06, 0x91, 0x15, 0x46, 0x87, + 0xf4, 0x55, 0x53, 0xdf, 0xd6, 0xda, 0xeb, 0x24, 0xd9, 0xa3, 0xdb, 0x50, 0x7d, 0xe2, 0x39, 0xfc, + 0xc4, 0x10, 0x27, 0x72, 0x87, 0xff, 0xa0, 0x83, 0xf9, 0x39, 0x17, 0x75, 0x40, 0xad, 0x88, 0xd0, + 0xe7, 0x33, 0xca, 0x22, 0xf4, 0x09, 0xac, 0xdb, 0xa7, 0x96, 0x37, 0xa6, 0x27, 0x94, 0x3a, 0x52, + 0x4e, 0x63, 0xef, 0xee, 0x6e, 0x46, 0xa7, 0xdd, 0x4e, 0x06, 0x81, 0xe4, 0xd0, 0xd1, 0x47, 0x50, + 0x3f, 0xb7, 0x22, 0x1a, 0x4e, 0xad, 0xf0, 0x4c, 0x28, 0xd2, 0xd8, 0xbb, 0x9d, 0xa3, 0x7d, 0xa6, + 0x4e, 0x49, 0x8a, 0x88, 0x7e, 0x04, 0x35, 0x16, 0x59, 0xd1, 0x8c, 0x51, 0xd6, 0x34, 0xb6, 0x8d, + 0x76, 0x63, 0xef, 0x7e, 0x8e, 0x28, 0xb1, 0xc0, 0x40, 0x60, 0x91, 0x04, 0x1b, 0xb5, 0xe1, 0x86, + 0xed, 0x4f, 0x03, 0x3a, 0xa1, 0x11, 0x8d, 0x0f, 0x9b, 0x95, 0x6d, 0xad, 0x5d, 0x23, 0x45, 0x30, + 0xfa, 0x00, 0x0c, 0x1a, 0x86, 0xcd, 0xd5, 0x92, 0xfb, 0x90, 0x99, 0xe7, 0xb9, 0xde, 0xf8, 0x49, + 0x18, 0xfa, 0x21, 0xe1, 0x58, 0xd8, 0x82, 0x7a, 0xa2, 0x28, 0xc2, 0xdc, 0x24, 0xd4, 0x3e, 0x0b, + 0x7c, 0xd7, 0x8b, 0x8e, 0x99, 0x30, 0x49, 0x85, 0xe4, 0x60, 0xa8, 0x05, 0x10, 0x52, 0xe6, 0x4f, + 0x5e, 0x50, 0xe7, 0x98, 0x89, 0x8b, 0x57, 0x48, 0x06, 0x82, 0x4c, 0x30, 0x18, 0x7d, 0x2e, 0x1c, + 0x50, 0x21, 0x7c, 0x89, 0x7f, 0x03, 0x66, 0xd7, 0x65, 0x81, 0x15, 0xd9, 0xa7, 0x34, 0xdc, 0xb7, + 0x23, 0xd7, 0xf7, 0xd0, 0x07, 0x50, 0xb5, 0xc4, 0x4a, 0xc8, 0xd8, 0xdc, 0xbb, 0x99, 0x53, 0x33, + 0x46, 0x22, 0x12, 0x85, 0xbb, 0xbc, 0xe3, 0x4f, 0xa7, 0x6e, 0x94, 0x08, 0x4c, 0xf6, 0x68, 0x1b, + 0x1a, 0x3d, 0x36, 0x78, 0xe5, 0xd9, 0x47, 0x5c, 0x3f, 0x21, 0xb6, 0x46, 0xb2, 0x20, 0xdc, 0x01, + 0x63, 0xbf, 0x73, 0x98, 0x63, 0xa2, 0x5d, 0xcc, 0x44, 0x9f, 0x67, 0xf2, 0x5b, 0x1d, 0x6e, 0xf5, + 0xbc, 0x93, 0xc9, 0x8c, 0x7a, 0x36, 0x75, 0xd2, 0xeb, 0x30, 0xf4, 0x73, 0xd8, 0x48, 0x0e, 0x8e, + 0x5f, 0x05, 0x54, 0x5e, 0xe8, 0x5e, 0xee, 0x42, 0x39, 0x0c, 0x92, 0x27, 0x40, 0x9f, 0xc2, 0x46, + 0xca, 0xb0, 0xd7, 0xe5, 0x77, 0x34, 0xe6, 0x3c, 0x97, 0xc5, 0x20, 0x79, 0x7c, 0x91, 0x12, 0xf6, + 0x29, 0x9d, 0x5a, 0xbd, 0xae, 0x30, 0x80, 0x41, 0x92, 0x3d, 0x3a, 0x84, 0x9b, 0xf4, 0xa5, 0x3d, + 0x99, 0x39, 0x34, 0x43, 0xe3, 0x88, 0xd0, 0xb9, 0x50, 0x44, 0x19, 0x15, 0xfe, 0x8b, 0x96, 0x75, + 0xa5, 0x0c, 0xb7, 0x5f, 0xc1, 0x2d, 0xb7, 0xcc, 0x32, 0x32, 0xa1, 0x70, 0xb9, 0x21, 0xb2, 0x98, + 0xa4, 0x9c, 0x01, 0x7a, 0x9c, 0x04, 0x49, 0x9c, 0x5f, 0x0f, 0x16, 0xa8, 0x5b, 0x08, 0x17, 0x0c, + 0x86, 0x65, 0x9f, 0x09, 0x4b, 0x34, 0xf6, 0xcc, 0x7c, 0x60, 0x75, 0x0e, 0x09, 0x3f, 0xc4, 0xdf, + 0x68, 0xf0, 0x4e, 0xa6, 0x22, 0xb0, 0xc0, 0xf7, 0x18, 0xbd, 0x6e, 0x49, 0x78, 0x0a, 0xc8, 0x29, + 0x58, 0x87, 0x2a, 0x6f, 0x2e, 0xd2, 0x5d, 0xe6, 0x79, 0x09, 0x21, 0x7e, 0x09, 0x37, 0x3b, 0x99, + 0xcc, 0x7b, 0x4a, 0x19, 0xb3, 0xc6, 0xd7, 0x56, 0xb2, 0x98, 0xe3, 0xfa, 0x7c, 0x8e, 0xe3, 0xbf, + 0xe7, 0xfc, 0xdc, 0xf1, 0xbd, 0x13, 0x77, 0x8c, 0x76, 0xa0, 0xc2, 0x02, 0xcb, 0x93, 0xf2, 0x6e, + 0x97, 0x97, 0x2d, 0x22, 0x70, 0x78, 0xf9, 0x66, 0xbc, 0x28, 0x27, 0xfc, 0xd5, 0x96, 0x6b, 0xef, + 0x64, 0xe2, 0x4c, 0x7a, 0xe9, 0x82, 0x40, 0xcc, 0xa1, 0xf3, 0x50, 0x67, 0x2a, 0xd4, 0x2b, 0x71, + 0xa8, 0xab, 0x3d, 0xc2, 0xb0, 0x61, 0xcf, 0xc2, 0x90, 0x7a, 0xd1, 0x30, 0x70, 0x86, 0x11, 0x13, + 0x15, 0xb0, 0x42, 0x1a, 0x12, 0x78, 0xe4, 0x1c, 0x33, 0xfc, 0x37, 0x0d, 0xee, 0xf2, 0xdc, 0x70, + 0x66, 0x93, 0x4c, 0x68, 0x2f, 0xa9, 0x25, 0x3c, 0x86, 0xaa, 0x2d, 0x6c, 0x75, 0x49, 0xbc, 0xc6, + 0x06, 0x25, 0x12, 0x19, 0x75, 0x60, 0x93, 0x49, 0x95, 0xe2, 0x48, 0x16, 0x46, 0xd9, 0xdc, 0xdb, + 0xca, 0x91, 0x0f, 0x72, 0x28, 0xa4, 0x40, 0x82, 0x8f, 0xe0, 0xe6, 0x53, 0xcb, 0xf5, 0x22, 0xcb, + 0xf5, 0x68, 0xf8, 0xb9, 0xa2, 0x43, 0x3f, 0xce, 0xf4, 0x1b, 0xad, 0x24, 0x10, 0x53, 0x9a, 0x62, + 0xc3, 0xc1, 0x5f, 0xeb, 0x60, 0x16, 0x8f, 0xaf, 0x6b, 0xa1, 0x07, 0x00, 0x7c, 0x35, 0xe4, 0x42, + 0xa8, 0xb0, 0x52, 0x9d, 0xd4, 0x39, 0x84, 0xb3, 0xa7, 0xe8, 0x11, 0xac, 0xc6, 0x27, 0x65, 0x06, + 0xe8, 0xf8, 0xd3, 0xc0, 0xf7, 0xa8, 0x17, 0x09, 0x5c, 0x12, 0x63, 0xa2, 0xef, 0xc0, 0x46, 0x1a, + 0xba, 0xdc, 0xe9, 0x95, 0x92, 0x9e, 0x95, 0x74, 0x44, 0xe3, 0xf2, 0x8e, 0x88, 0xbe, 0x07, 0x9b, + 0x23, 0xdf, 0x8f, 0x58, 0x14, 0x5a, 0xc1, 0xd0, 0xf1, 0x3d, 0xda, 0xac, 0x8a, 0x7e, 0xb0, 0x91, + 0x40, 0xbb, 0xbe, 0x47, 0xf1, 0xc7, 0xb0, 0xd5, 0xf1, 0xfd, 0xd0, 0x71, 0x3d, 0x2b, 0xf2, 0xc3, + 0x03, 0x75, 0xa6, 0x42, 0xa9, 0x09, 0x6b, 0x2f, 0x68, 0xc8, 0x54, 0x87, 0x33, 0x88, 0xda, 0xe2, + 0x2f, 0xe1, 0x7e, 0x39, 0xa1, 0x2c, 0x42, 0xd7, 0x70, 0xd9, 0x9f, 0x34, 0x78, 0x77, 0xdf, 0x71, + 0x52, 0x0c, 0xa5, 0xcd, 0xf7, 0x41, 0x77, 0x9d, 0xcb, 0x9d, 0xa5, 0xbb, 0x0e, 0x9f, 0xa1, 0x32, + 0x41, 0xbc, 0x9e, 0x44, 0xe9, 0x9c, 0xa1, 0x8d, 0x12, 0x43, 0xb7, 0xc1, 0x74, 0xd9, 0xd0, 0xa3, + 0xe7, 0x43, 0xe1, 0x76, 0xce, 0x56, 0x4e, 0x29, 0x9b, 0x2e, 0xeb, 0xd3, 0xf3, 0x8e, 0x82, 0xe2, + 0x97, 0x70, 0x87, 0xd0, 0xa9, 0xff, 0x82, 0x5e, 0x4b, 0xd9, 0x26, 0xac, 0xd9, 0x16, 0xb3, 0x2d, + 0x87, 0xca, 0xa6, 0xad, 0xb6, 0xfc, 0x24, 0x14, 0xfc, 0x1d, 0x39, 0x13, 0xa8, 0x2d, 0xfe, 0xa3, + 0x0e, 0xf7, 0x52, 0xa1, 0x73, 0x8e, 0xbb, 0x66, 0x84, 0x2f, 0x32, 0xdf, 0x5d, 0xe1, 0xd5, 0x30, + 0x63, 0xb9, 0xa4, 0x24, 0xda, 0xf0, 0x7e, 0xc4, 0xeb, 0xe7, 0x30, 0x0a, 0xdd, 0xf1, 0x98, 0x86, + 0x43, 0xfa, 0x82, 0xd7, 0xb0, 0xb4, 0xee, 0x0d, 0xdd, 0x2b, 0x34, 0xec, 0x07, 0x82, 0xc7, 0x71, + 0xcc, 0xe2, 0x09, 0xe7, 0x90, 0x6d, 0xdd, 0xa5, 0x9e, 0x59, 0x2d, 0xf5, 0xcc, 0xbf, 0x35, 0xd8, + 0x2a, 0xb5, 0xcf, 0x72, 0x9a, 0xe4, 0x63, 0x58, 0xe5, 0x2d, 0x42, 0xf5, 0xc5, 0xf7, 0x72, 0x74, + 0x89, 0xb4, 0xb4, 0xa1, 0xc4, 0xd8, 0x2a, 0x85, 0x8d, 0xab, 0x0c, 0xb5, 0x57, 0x2a, 0x0a, 0xf8, + 0xbf, 0x1a, 0xb4, 0xd2, 0x7b, 0x1e, 0xf9, 0x2c, 0x5a, 0x76, 0x2c, 0x5c, 0xc9, 0xb1, 0xfa, 0x35, + 0x1d, 0xfb, 0x08, 0xd6, 0xe2, 0x0e, 0xa8, 0x1e, 0x14, 0x77, 0xe6, 0xda, 0xc6, 0xd4, 0xea, 0x79, + 0x27, 0x3e, 0x51, 0x78, 0xf8, 0x3f, 0x1a, 0xbc, 0xb7, 0xf0, 0xe6, 0xcb, 0xf1, 0xf2, 0xff, 0xe5, + 0xea, 0x6f, 0x13, 0x13, 0xf8, 0x25, 0x40, 0x6a, 0x8b, 0xdc, 0xc8, 0xac, 0x15, 0x46, 0xe6, 0x96, + 0xc2, 0xec, 0x5b, 0x53, 0xd5, 0xa4, 0x32, 0x10, 0xb4, 0x0b, 0x55, 0x11, 0x9e, 0xca, 0xe0, 0x25, + 0xa3, 0x90, 0xb0, 0xb7, 0xc4, 0xc2, 0x1d, 0xf9, 0xb0, 0x15, 0x82, 0x17, 0x3f, 0x6c, 0xef, 0x4b, + 0xb4, 0x8c, 0xd4, 0x14, 0x80, 0xff, 0xac, 0x03, 0x9a, 0xcf, 0x0e, 0x5e, 0x2b, 0x17, 0x38, 0x27, + 0x67, 0x48, 0x5d, 0x3e, 0x9c, 0xd5, 0x95, 0xf5, 0xc2, 0x95, 0xd5, 0x6c, 0x67, 0x5c, 0x61, 0xb6, + 0xfb, 0x05, 0x98, 0xb6, 0x6a, 0xc5, 0x43, 0x96, 0xbe, 0x44, 0x2f, 0xe9, 0xd7, 0x37, 0xec, 0xec, + 0x7e, 0xc6, 0xe6, 0x93, 0x74, 0xb5, 0xa4, 0xa1, 0x7c, 0x08, 0x8d, 0xd1, 0xc4, 0xb7, 0xcf, 0xe4, + 0xc4, 0x50, 0x15, 0xfa, 0xa1, 0x7c, 0x84, 0x0b, 0xf6, 0x20, 0xd0, 0xc4, 0x1a, 0x3f, 0x87, 0xdb, + 0x69, 0x78, 0x77, 0x26, 0x3e, 0xa3, 0x4b, 0x4a, 0xe8, 0x4c, 0x53, 0xd1, 0xf3, 0x4d, 0x25, 0x84, + 0x3b, 0x73, 0x22, 0x97, 0x93, 0x49, 0x7c, 0x94, 0x9e, 0xd9, 0x36, 0x65, 0x4c, 0xc9, 0x94, 0x5b, + 0xfc, 0x3b, 0x0d, 0xcc, 0xf4, 0x3d, 0x15, 0x07, 0xdb, 0x12, 0x9e, 0xa3, 0xf7, 0xa0, 0x26, 0x43, + 0x32, 0xae, 0xd1, 0x06, 0x49, 0xf6, 0x17, 0xbd, 0x34, 0xf1, 0x27, 0xb0, 0x2a, 0xf0, 0x2e, 0xf9, + 0x76, 0xb3, 0x20, 0x04, 0xb1, 0x07, 0x9b, 0x6a, 0x1d, 0x5b, 0xe3, 0x02, 0x3e, 0xdb, 0xd0, 0xf8, + 0x62, 0xe2, 0x14, 0x58, 0x65, 0x41, 0x1c, 0xa3, 0x4f, 0xcf, 0x0b, 0xba, 0x66, 0x41, 0xf8, 0x1b, + 0x03, 0x56, 0xe3, 0xa9, 0xf3, 0x3e, 0xd4, 0x7b, 0xec, 0x80, 0x87, 0x0f, 0x8d, 0xc7, 0x8e, 0x1a, + 0x49, 0x01, 0x5c, 0x0b, 0xb1, 0x4c, 0x9f, 0x32, 0x72, 0x8b, 0x3e, 0x85, 0x46, 0xbc, 0x54, 0xc5, + 0x60, 0x7e, 0xe6, 0x2f, 0xba, 0x87, 0x64, 0x29, 0xd0, 0x21, 0xbc, 0xd3, 0xa7, 0xd4, 0xe9, 0x86, + 0x7e, 0x10, 0x28, 0x0c, 0xd9, 0xe8, 0x2f, 0x61, 0x33, 0x4f, 0x87, 0x7e, 0x0a, 0x37, 0x38, 0x70, + 0xdf, 0x71, 0x12, 0x56, 0xf1, 0xbc, 0x8b, 0xe6, 0xb3, 0x99, 0x14, 0x51, 0xf9, 0x1b, 0xe4, 0x97, + 0x81, 0x63, 0x45, 0x54, 0x9a, 0x90, 0x35, 0xab, 0x82, 0x78, 0xab, 0xac, 0x99, 0x48, 0x07, 0x91, + 0x02, 0x49, 0xf1, 0x33, 0xca, 0xda, 0xdc, 0x67, 0x14, 0xf4, 0x43, 0x31, 0xe0, 0x8f, 0x69, 0xb3, + 0x26, 0xa2, 0x32, 0xdf, 0xaa, 0x0e, 0x64, 0x06, 0x8f, 0xe3, 0xe1, 0x7e, 0x4c, 0xf1, 0x19, 0xbc, + 0x9b, 0x54, 0x1f, 0x75, 0xca, 0x4b, 0xc7, 0x5b, 0x54, 0xbd, 0xb6, 0x7a, 0x52, 0xe8, 0x0b, 0x4b, + 0x47, 0x8c, 0x80, 0xff, 0xa9, 0xc1, 0x8d, 0xc2, 0xe7, 0xb7, 0xb7, 0x11, 0x54, 0x56, 0x16, 0xf5, + 0x65, 0x94, 0xc5, 0xb2, 0x39, 0xfb, 0x11, 0xdc, 0x8a, 0x1b, 0x2a, 0x73, 0xbf, 0xa2, 0xc3, 0x80, + 0x86, 0x43, 0x46, 0x6d, 0xdf, 0x8b, 0xc7, 0x44, 0x9d, 0x20, 0x71, 0x38, 0x70, 0xbf, 0xa2, 0x47, + 0x34, 0x1c, 0x88, 0x13, 0xfc, 0xb5, 0x06, 0x28, 0x63, 0xc3, 0x25, 0x55, 0xc4, 0xcf, 0x60, 0x63, + 0x94, 0x32, 0x4d, 0xbe, 0x76, 0xbc, 0x5f, 0xde, 0x41, 0xb2, 0xf2, 0xf3, 0x74, 0xd8, 0x81, 0xf5, + 0x6c, 0xcf, 0x46, 0x08, 0x2a, 0x91, 0x3b, 0x8d, 0xcb, 0x57, 0x9d, 0x88, 0x35, 0x87, 0x79, 0xbe, + 0xa3, 0x9a, 0xa3, 0x58, 0x73, 0x98, 0xcd, 0x61, 0x46, 0x0c, 0xe3, 0x6b, 0x9e, 0xb2, 0xd3, 0xf8, + 0x63, 0x89, 0xb0, 0x47, 0x9d, 0xa8, 0x2d, 0xfe, 0x08, 0xd6, 0xb3, 0x8e, 0xe3, 0xd4, 0xa7, 0xee, + 0xf8, 0x54, 0x7e, 0x10, 0x14, 0x6b, 0x64, 0x82, 0x31, 0xf1, 0xcf, 0x65, 0xb2, 0xf3, 0x25, 0x3e, + 0x81, 0xf5, 0xac, 0x09, 0xae, 0x46, 0x25, 0xb4, 0xe5, 0xad, 0x5c, 0x6a, 0xc6, 0xd7, 0xbc, 0xd4, + 0xf0, 0x5f, 0x16, 0x58, 0xb6, 0xd2, 0x2d, 0x05, 0xec, 0x3c, 0x80, 0xaa, 0xfc, 0x3c, 0x5a, 0x87, + 0xd5, 0x67, 0xa1, 0x1b, 0x51, 0x73, 0x05, 0xd5, 0xa0, 0x72, 0x64, 0x31, 0x66, 0x6a, 0x3b, 0xed, + 0xb8, 0x42, 0xa6, 0x8f, 0x7e, 0x04, 0x50, 0xed, 0x84, 0xd4, 0x12, 0x78, 0x00, 0xd5, 0xf8, 0x41, + 0x65, 0x6a, 0x3b, 0x3f, 0x01, 0x48, 0x93, 0x89, 0x73, 0xe8, 0x7f, 0xd1, 0x7f, 0x62, 0xae, 0xa0, + 0x06, 0xac, 0x3d, 0xdb, 0xef, 0x1d, 0xf7, 0xfa, 0x9f, 0x99, 0x9a, 0xd8, 0x90, 0x78, 0xa3, 0x73, + 0x9c, 0x2e, 0xc7, 0x31, 0x76, 0x7e, 0x50, 0x68, 0x20, 0x68, 0x0d, 0x8c, 0xfd, 0xc9, 0xc4, 0x5c, + 0x41, 0x55, 0xd0, 0xbb, 0x07, 0xa6, 0xc6, 0x25, 0xf5, 0xfd, 0x70, 0x6a, 0x4d, 0x4c, 0x7d, 0xe7, + 0x63, 0xd8, 0xcc, 0x07, 0xb4, 0x60, 0xeb, 0x87, 0x67, 0xae, 0x37, 0x8e, 0x05, 0x0e, 0x22, 0x51, + 0xa5, 0x62, 0x81, 0xb1, 0x86, 0x8e, 0xa9, 0x1f, 0xfc, 0xec, 0xaf, 0xaf, 0x5b, 0xda, 0xb7, 0xaf, + 0x5b, 0xda, 0xbf, 0x5e, 0xb7, 0xb4, 0xdf, 0xbf, 0x69, 0xad, 0x7c, 0xfb, 0xa6, 0xb5, 0xf2, 0x8f, + 0x37, 0xad, 0x95, 0x5f, 0x7f, 0x77, 0xec, 0x46, 0xa7, 0xb3, 0xd1, 0xae, 0xed, 0x4f, 0x1f, 0x06, + 0xae, 0x37, 0xb6, 0xad, 0xe0, 0x61, 0xe4, 0xda, 0x8e, 0xfd, 0x30, 0x13, 0x53, 0xa3, 0xaa, 0xf8, + 0x07, 0xe1, 0xc3, 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0xdd, 0x64, 0x4e, 0xef, 0x60, 0x18, 0x00, + 0x00, } func (m *TableSpan) Marshal() (dAtA []byte, err error) { @@ -3168,6 +3178,16 @@ func (m *MaintainerStatus) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.BootstrapDone { + i-- + if m.BootstrapDone { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x30 + } if len(m.Err) > 0 { for iNdEx := len(m.Err) - 1; iNdEx >= 0; iNdEx-- { { @@ -4680,6 +4700,9 @@ func (m *MaintainerStatus) Size() (n int) { n += 1 + l + sovHeartbeat(uint64(l)) } } + if m.BootstrapDone { + n += 2 + } return n } @@ -6939,6 +6962,26 @@ func (m *MaintainerStatus) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field BootstrapDone", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHeartbeat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.BootstrapDone = bool(v != 0) default: iNdEx = preIndex skippy, err := skipHeartbeat(dAtA[iNdEx:]) diff --git a/heartbeatpb/heartbeat.proto b/heartbeatpb/heartbeat.proto index f3b79ebbf3..17c06bb353 100644 --- a/heartbeatpb/heartbeat.proto +++ b/heartbeatpb/heartbeat.proto @@ -95,6 +95,7 @@ message MaintainerStatus { ComponentState state = 3; uint64 checkpoint_ts = 4; repeated RunningError err = 5; + bool bootstrap_done = 6; } message CoordinatorBootstrapRequest { diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 677dcd10d8..4772101811 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -263,8 +263,7 @@ func (p *persistentStorage) getAllPhysicalTables(snapTs uint64, tableFilter filt start := time.Now() defer func() { - log.Info("getAllPhysicalTables finish", - zap.Uint64("snapTs", snapTs), + log.Debug("getAllPhysicalTables finish", zap.Any("duration(s)", time.Since(start).Seconds())) }() return loadAllPhysicalTablesAtTs(storageSnap, gcTs, snapTs, tableFilter) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 9858910146..0a7d6a9c4a 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -38,7 +38,6 @@ import ( "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/ticdc/utils/threadpool" - "github.com/pingcap/tiflow/cdc/model" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/oracle" "go.uber.org/atomic" @@ -82,14 +81,12 @@ type Maintainer struct { checkpointTsByCapture map[node.ID]heartbeatpb.Watermark - state atomic.Int32 - bootstrapper *bootstrap.Bootstrapper[heartbeatpb.MaintainerBootstrapResponse] - - changefeedSate model.FeedState + scheduleState atomic.Int32 + bootstrapper *bootstrap.Bootstrapper[heartbeatpb.MaintainerBootstrapResponse] removed *atomic.Bool - bootstrapped bool + bootstrapped atomic.Bool postBootstrapMsg *heartbeatpb.MaintainerPostBootstrapRequest // startCheckpointTs is the check point ts when the maintainer is created @@ -104,6 +101,8 @@ type Maintainer struct { // closedNodes is used to record the nodes that dispatcherManager is closed closedNodes map[node.ID]struct{} + // statusChanged is used to notify the maintainer manager to send heartbeat to coordinator + // to report the changefeed's status. statusChanged *atomic.Bool mutex sync.Mutex // protect nodeChanged and do onNodeChanged() @@ -204,9 +203,10 @@ func NewMaintainer(cfID common.ChangeFeedID, CheckpointTs: checkpointTs, ResolvedTs: checkpointTs, } - m.state.Store(int32(heartbeatpb.ComponentState_Working)) + m.scheduleState.Store(int32(heartbeatpb.ComponentState_Working)) m.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse](m.id.Name(), m.getNewBootstrapFn()) log.Info("changefeed maintainer is created", zap.String("id", cfID.String()), + zap.String("state", string(cfg.State)), zap.Uint64("checkpointTs", checkpointTs), zap.String("ddlDispatcherID", tableTriggerEventDispatcherID.String())) metrics.MaintainerGauge.WithLabelValues(cfID.Namespace(), cfID.Name()).Inc() @@ -259,7 +259,7 @@ func (m *Maintainer) HandleEvent(event *Event) bool { } m.handleEventDuration.Observe(duration.Seconds()) }() - if m.state.Load() == int32(heartbeatpb.ComponentState_Stopped) { + if m.scheduleState.Load() == int32(heartbeatpb.ComponentState_Stopped) { log.Warn("maintainer is stopped, ignore", zap.String("changefeed", m.id.String())) return false @@ -312,11 +312,11 @@ func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus { } status := &heartbeatpb.MaintainerStatus{ - ChangefeedID: m.id.ToPB(), - FeedState: string(m.changefeedSate), - State: heartbeatpb.ComponentState(m.state.Load()), - CheckpointTs: m.getWatermark().CheckpointTs, - Err: runningErrors, + ChangefeedID: m.id.ToPB(), + State: heartbeatpb.ComponentState(m.scheduleState.Load()), + CheckpointTs: m.getWatermark().CheckpointTs, + Err: runningErrors, + BootstrapDone: m.bootstrapped.Load(), } return status } @@ -353,7 +353,6 @@ func (m *Maintainer) initialize() error { zap.String("id", m.id.String()), zap.String("status", common.FormatMaintainerStatus(m.GetMaintainerStatus())), zap.Duration("duration", time.Since(start))) - m.state.Store(int32(heartbeatpb.ComponentState_Working)) m.statusChanged.Store(true) return nil } @@ -409,7 +408,7 @@ func (m *Maintainer) onRemoveMaintainer(cascade, changefeedRemoved bool) { closed := m.tryCloseChangefeed() if closed { m.removed.Store(true) - m.state.Store(int32(heartbeatpb.ComponentState_Stopped)) + m.scheduleState.Store(int32(heartbeatpb.ComponentState_Stopped)) metrics.MaintainerGauge.WithLabelValues(m.id.Namespace(), m.id.Name()).Dec() } } @@ -452,7 +451,7 @@ func (m *Maintainer) onNodeChanged() { func (m *Maintainer) calCheckpointTs() { defer m.updateMetrics() - if !m.bootstrapped { + if !m.bootstrapped.Load() { log.Warn("can not advance checkpointTs since not bootstrapped", zap.String("changefeed", m.id.Name()), zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), @@ -516,7 +515,7 @@ func (m *Maintainer) updateMetrics() { lag = float64(oracle.GetPhysical(pdTime)-phyResolvedTs) / 1e3 m.changefeedResolvedTsLagGauge.Set(lag) - m.changefeedStatusGauge.Set(float64(m.state.Load())) + m.changefeedStatusGauge.Set(float64(m.scheduleState.Load())) } // send message to remote @@ -534,7 +533,7 @@ func (m *Maintainer) sendMessages(msgs []*messaging.TargetMessage) { func (m *Maintainer) onHeartBeatRequest(msg *messaging.TargetMessage) { // ignore the heartbeat if the maintianer not bootstrapped - if !m.bootstrapped { + if !m.bootstrapped.Load() { return } req := msg.Message[0].(*heartbeatpb.HeartBeatRequest) @@ -567,7 +566,7 @@ func (m *Maintainer) onError(from node.ID, err *heartbeatpb.RunningError) { func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) { // the barrier is not initialized - if !m.bootstrapped { + if !m.bootstrapped.Load() { return } req := msg.Message[0].(*heartbeatpb.BlockStatusRequest) @@ -591,7 +590,7 @@ func (m *Maintainer) onMaintainerBootstrapResponse(msg *messaging.TargetMessage) m.onBootstrapDone(cachedResp) // when get the bootstrap response from the event dispatcher manager with table trigger event, - // set new changefeed to false, which means the changefeed is no longer new created. + // set new changefeed to false, which means the changefeed is not new created. if msg.From == m.selfNode.ID { m.newChangefeed = false } @@ -628,13 +627,16 @@ func (m *Maintainer) onBootstrapDone(cachedResp map[node.ID]*heartbeatpb.Maintai return } m.barrier = barrier - m.bootstrapped = true + m.bootstrapped.Store(true) // Memory Consumption is 64(tableName/schemaName limit) * 4(utf8.UTFMax) * 2(tableName+schemaName) * tableNum // For an extreme case(100w tables, and 64 utf8 characters for each name), the memory consumption is about 488MB. // For a normal case(100w tables, and 16 ascii characters for each name), the memory consumption is about 30MB. m.postBootstrapMsg = msg m.sendPostBootstrapRequest() + // set status changed to true, trigger the maintainer manager to send heartbeat to coordinator + // to report the this changefeed's status + m.statusChanged.Store(true) } func (m *Maintainer) sendPostBootstrapRequest() { @@ -669,7 +671,7 @@ func (m *Maintainer) handleResendMessage() { } func (m *Maintainer) tryCloseChangefeed() bool { - if m.state.Load() != int32(heartbeatpb.ComponentState_Stopped) { + if m.scheduleState.Load() != int32(heartbeatpb.ComponentState_Stopped) { m.statusChanged.Store(true) } if !m.cascadeRemoving { @@ -775,7 +777,7 @@ func (m *Maintainer) onPeriodTask() { } func (m *Maintainer) collectMetrics() { - if !m.bootstrapped { + if !m.bootstrapped.Load() { return } if time.Since(m.lastPrintStatusTime) > time.Second*20 { diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index 8d03bc83df..13396c8118 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -103,25 +103,9 @@ func NewController(changefeedID common.ChangeFeedID, enableTableAcrossNodes: enableTableAcrossNodes, } s.schedulerController = NewScheduleController(changefeedID, batchSize, oc, replicaSetDB, nodeManager, balanceInterval, s.splitter) - go s.fizz() return s } -// fizz is only for test, remove it after test -func (c *Controller) fizz() { - log.Info("start fuzz") - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - for range ticker.C { - serviceGCSafepoint, err := c.pdAPIClient.ListGcServiceSafePoint(context.Background()) - if err != nil { - log.Error("failed to get gc safepoint", zap.Error(err)) - continue - } - log.Info("service gc safepoint", zap.Any("serviceGCSafepoint", serviceGCSafepoint)) - } -} - // HandleStatus handle the status report from the node func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) { for _, status := range statusList { diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 6cfed43dd0..8f1665fce5 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -33,6 +33,10 @@ import ( "go.uber.org/zap" ) +const ( + reportMaintainerStatusInterval = time.Millisecond * 1000 +) + // Manager is the manager of all changefeed maintainer in a ticdc watcher, each ticdc watcher will // start a Manager when the watcher is startup. It responsible for: // 1. Handle bootstrap command from coordinator and report all changefeed maintainer status. @@ -208,11 +212,11 @@ func (m *Manager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) { zap.Int64("version", m.coordinatorVersion)) } -func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest) { +func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest) *heartbeatpb.MaintainerStatus { cfID := common.NewChangefeedIDFromPB(req.Id) _, ok := m.maintainers.Load(cfID) if ok { - return + return nil } cfConfig := &config.ChangeFeedInfo{} @@ -226,15 +230,15 @@ func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest) zap.Uint64("checkpointTs", req.CheckpointTs), zap.Any("config", cfConfig)) } - cf := NewMaintainer(cfID, m.conf, cfConfig, m.selfNode, m.taskScheduler, + maintainer := NewMaintainer(cfID, m.conf, cfConfig, m.selfNode, m.taskScheduler, m.pdAPI, m.tsoClient, m.regionCache, req.CheckpointTs, req.IsNewChangfeed) if err != nil { log.Warn("add path to dynstream failed, coordinator will retry later", zap.Error(err)) - return + return nil } - - cf.pushEvent(&Event{changefeedID: cfID, eventType: EventInit}) - m.maintainers.Store(cfID, cf) + maintainer.pushEvent(&Event{changefeedID: cfID, eventType: EventInit}) + m.maintainers.Store(cfID, maintainer) + return nil } func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heartbeatpb.MaintainerStatus { @@ -281,10 +285,11 @@ func (m *Manager) onDispatchMaintainerRequest( switch msg.Type { case messaging.TypeAddMaintainerRequest: req := msg.Message[0].(*heartbeatpb.AddMaintainerRequest) - m.onAddMaintainerRequest(req) + return m.onAddMaintainerRequest(req) case messaging.TypeRemoveMaintainerRequest: return m.onRemoveMaintainerRequest(msg) default: + log.Warn("unknown message type", zap.Any("message", msg.Message)) } return nil } @@ -294,7 +299,8 @@ func (m *Manager) sendHeartbeat() { response := &heartbeatpb.MaintainerHeartbeat{} m.maintainers.Range(func(key, value interface{}) bool { cfMaintainer := value.(*Maintainer) - if cfMaintainer.statusChanged.Load() || time.Since(cfMaintainer.lastReportTime) > time.Second*2 { + if cfMaintainer.statusChanged.Load() || + time.Since(cfMaintainer.lastReportTime) > reportMaintainerStatusInterval { mStatus := cfMaintainer.GetMaintainerStatus() response.Statuses = append(response.Statuses, mStatus) cfMaintainer.statusChanged.Store(false) diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index 5deb2b9fcd..f93679be15 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -241,7 +241,7 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { time.Sleep(5 * time.Second) require.Eventually(t, func() bool { - return maintainer.state.Load() == int32(heartbeatpb.ComponentState_Stopped) + return maintainer.scheduleState.Load() == int32(heartbeatpb.ComponentState_Stopped) }, 20*time.Second, 200*time.Millisecond) _, ok = manager.maintainers.Load(cfID) diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index 7478ec8f75..ff44be8192 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -91,7 +91,10 @@ func (m *gcManager) TryUpdateGCSafePoint( actual = uint64(val.(int)) }) - log.Debug("update gc safe point", zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("actual", actual)) + log.Debug("update gc safe point", + zap.String("gcServiceID", m.gcServiceID), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("actual", actual)) if actual == checkpointTs { log.Info("update gc safe point success", zap.Uint64("gcSafePointTs", checkpointTs)) diff --git a/pkg/txnutil/gc/gc_service.go b/pkg/txnutil/gc/gc_service.go index 44ba6cd9cf..59ffc67983 100644 --- a/pkg/txnutil/gc/gc_service.go +++ b/pkg/txnutil/gc/gc_service.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/retry" pd "github.com/tikv/pd/client" @@ -38,17 +39,44 @@ const ( // service GC safepoint and this function will update the service GC to startTs func EnsureChangefeedStartTsSafety( ctx context.Context, pdCli pd.Client, - gcServiceIDPrefix string, + ticdcServiceID string, + tag string, changefeedID common.ChangeFeedID, TTL int64, startTs uint64, ) error { + cdcGcTTL := config.GetGlobalServerConfig().GcTTL + // set gc safepoint for ticdc gc service first + // This is to ensure that the ticdc gc service gcSafepoint is set before + // the changefeed gc service gcSafepoint minServiceGCTs, err := SetServiceGCSafepoint( ctx, pdCli, - gcServiceIDPrefix+changefeedID.Namespace()+"_"+changefeedID.Name(), + ticdcServiceID, + cdcGcTTL, startTs) + if err != nil { + return errors.Trace(err) + } + log.Info("set gc safepoint for ticdc service", + zap.String("gcServiceID", ticdcServiceID), + zap.Uint64("expectedGCSafepoint", startTs), + zap.Uint64("actualGCSafepoint", minServiceGCTs), + zap.Int64("ttl", cdcGcTTL)) + + // set gc safepoint for the changefeed gc service + // set gc safepoint for the changefeed gc service + minServiceGCTs, err = SetServiceGCSafepoint( + ctx, pdCli, + ticdcServiceID+tag+changefeedID.Namespace()+"_"+changefeedID.Name(), TTL, startTs) if err != nil { return errors.Trace(err) } + + log.Info("set gc safepoint for changefeed", + zap.String("gcServiceID", ticdcServiceID+tag+changefeedID.Namespace()+"_"+changefeedID.Name()), + zap.Uint64("expectedGCSafepoint", startTs), + zap.Uint64("actualGCSafepoint", minServiceGCTs), + zap.Int64("ttl", TTL)) + // startTs should be greater than or equal to minServiceGCTs + 1, otherwise gcManager // would return a ErrSnapshotLostByGC even though the changefeed would appear to be successfully // created/resumed. See issue #6350 for more detail. diff --git a/server/module_election.go b/server/module_election.go index 3286fae9fd..e08c93f14e 100644 --- a/server/module_election.go +++ b/server/module_election.go @@ -133,7 +133,7 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { co := coordinator.New(e.svr.info, e.svr.pdClient, e.svr.PDClock, changefeed.NewEtcdBackend(e.svr.EtcdClient), - e.svr.EtcdClient.GetClusterID(), + e.svr.EtcdClient.GetGCServiceID(), coordinatorVersion, 10000, time.Minute) e.svr.setCoordinator(co) err = co.Run(ctx) From b14c51947e96392a7675f777de8fb0dc3da345fd Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 23 Jan 2025 16:18:31 +0800 Subject: [PATCH 5/5] *: adjust Signed-off-by: dongmen <414110582@qq.com> --- coordinator/coordinator.go | 7 ++++++- maintainer/maintainer_controller.go | 2 -- pkg/txnutil/gc/gc_service.go | 26 ++++---------------------- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 65ac357270..e80a0ef962 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -308,7 +308,12 @@ func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.Chang } func (c *coordinator) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error { - return c.controller.CreateChangefeed(ctx, info) + err := c.controller.CreateChangefeed(ctx, info) + if err != nil { + return errors.Trace(err) + } + // update gc safepoint after create changefeed + return c.updateGCSafepoint(ctx) } func (c *coordinator) RemoveChangefeed(ctx context.Context, id common.ChangeFeedID) (uint64, error) { diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index 13396c8118..d9d6523678 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -52,7 +52,6 @@ type Controller struct { messageCenter messaging.MessageCenter nodeManager *watcher.NodeManager tsoClient replica.TSOClient - pdAPIClient pdutil.PDAPIClient splitter *split.Splitter enableTableAcrossNodes bool @@ -98,7 +97,6 @@ func NewController(changefeedID common.ChangeFeedID, taskScheduler: taskScheduler, cfConfig: cfConfig, tsoClient: tsoClient, - pdAPIClient: pdAPIClient, splitter: splitter, enableTableAcrossNodes: enableTableAcrossNodes, } diff --git a/pkg/txnutil/gc/gc_service.go b/pkg/txnutil/gc/gc_service.go index 59ffc67983..735c388251 100644 --- a/pkg/txnutil/gc/gc_service.go +++ b/pkg/txnutil/gc/gc_service.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" - "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/retry" pd "github.com/tikv/pd/client" @@ -44,35 +43,18 @@ func EnsureChangefeedStartTsSafety( changefeedID common.ChangeFeedID, TTL int64, startTs uint64, ) error { - cdcGcTTL := config.GetGlobalServerConfig().GcTTL - // set gc safepoint for ticdc gc service first - // This is to ensure that the ticdc gc service gcSafepoint is set before - // the changefeed gc service gcSafepoint - minServiceGCTs, err := SetServiceGCSafepoint( - ctx, pdCli, - ticdcServiceID, - cdcGcTTL, startTs) - if err != nil { - return errors.Trace(err) - } - log.Info("set gc safepoint for ticdc service", - zap.String("gcServiceID", ticdcServiceID), - zap.Uint64("expectedGCSafepoint", startTs), - zap.Uint64("actualGCSafepoint", minServiceGCTs), - zap.Int64("ttl", cdcGcTTL)) + gcServiceID := ticdcServiceID + tag + changefeedID.Namespace() + "_" + changefeedID.Name() // set gc safepoint for the changefeed gc service - // set gc safepoint for the changefeed gc service - minServiceGCTs, err = SetServiceGCSafepoint( + minServiceGCTs, err := SetServiceGCSafepoint( ctx, pdCli, - ticdcServiceID+tag+changefeedID.Namespace()+"_"+changefeedID.Name(), + gcServiceID, TTL, startTs) if err != nil { return errors.Trace(err) } - log.Info("set gc safepoint for changefeed", - zap.String("gcServiceID", ticdcServiceID+tag+changefeedID.Namespace()+"_"+changefeedID.Name()), + zap.String("gcServiceID", gcServiceID), zap.Uint64("expectedGCSafepoint", startTs), zap.Uint64("actualGCSafepoint", minServiceGCTs), zap.Int64("ttl", TTL))