Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=autorandom

# - name: Test availability
# if: ${{ success() }}
# run: |
# export TICDC_NEWARCH=true && make integration_test CASE=availability
- name: Test availability
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=availability

- name: Test bank
if: ${{ success() }}
Expand Down Expand Up @@ -282,7 +282,8 @@ jobs:
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=partition_table


# The 3th case in this group
- name: Test multi_tables_ddl
if: ${{ success() }}
run: |
Expand Down
14 changes: 9 additions & 5 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/scheduler"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/ticdc/utils/chann"
"github.com/pingcap/ticdc/utils/threadpool"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand All @@ -52,6 +52,7 @@ type Controller struct {
operatorController *operator.Controller
changefeedDB *changefeed.ChangefeedDB
backend changefeed.Backend
eventCh *chann.DrainableChann[*Event]

bootstrapped *atomic.Bool
bootstrapper *bootstrap.Bootstrapper[heartbeatpb.CoordinatorBootstrapResponse]
Expand All @@ -60,7 +61,6 @@ type Controller struct {
nodeChanged bool
nodeManager *watcher.NodeManager

stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler]
taskScheduler threadpool.ThreadPool
taskHandlers []*threadpool.TaskHandle
messageCenter messaging.MessageCenter
Expand All @@ -85,7 +85,7 @@ func NewController(
updatedChangefeedCh chan map[common.ChangeFeedID]*changefeed.Changefeed,
stateChangedCh chan *ChangefeedStateChangeEvent,
backend changefeed.Backend,
stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler],
eventCh *chann.DrainableChann[*Event],
taskScheduler threadpool.ThreadPool,
batchSize int, balanceInterval time.Duration,
) *Controller {
Expand All @@ -101,11 +101,11 @@ func NewController(
scheduler.BasicScheduler: scheduler.NewBasicScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, oc.NewAddMaintainerOperator),
scheduler.BalanceScheduler: scheduler.NewBalanceScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, balanceInterval, oc.NewMoveMaintainerOperator),
}),
eventCh: eventCh,
operatorController: oc,
messageCenter: mc,
changefeedDB: changefeedDB,
nodeManager: nodeManager,
stream: stream,
taskScheduler: taskScheduler,
backend: backend,
nodeChanged: false,
Expand Down Expand Up @@ -137,6 +137,10 @@ func NewController(

// HandleEvent implements the event-driven process mode
func (c *Controller) HandleEvent(event *Event) bool {
if event == nil {
return false
}

start := time.Now()
defer func() {
duration := time.Since(start)
Expand Down Expand Up @@ -514,7 +518,7 @@ func (c *Controller) RemoveNode(id node.ID) {

func (c *Controller) submitPeriodTask() {
task := func() time.Time {
c.stream.Push("coordinator", &Event{eventType: EventPeriod})
c.eventCh.In() <- &Event{eventType: EventPeriod}
return time.Now().Add(time.Millisecond * 500)
}
periodTaskhandler := c.taskScheduler.SubmitFunc(task, time.Now().Add(time.Millisecond*500))
Expand Down
73 changes: 50 additions & 23 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/coordinator/changefeed"
"github.com/pingcap/ticdc/pkg/common"
Expand All @@ -27,14 +28,16 @@ import (
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/server"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils/chann"
"github.com/pingcap/ticdc/utils/threadpool"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -52,18 +55,19 @@ type coordinator struct {
controller *Controller

mc messaging.MessageCenter
stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler]
taskScheduler threadpool.ThreadPool

gcManager gc.Manager
pdClient pd.Client
pdClock pdutil.Clock

eventCh *chann.DrainableChann[*Event]
updatedChangefeedCh chan map[common.ChangeFeedID]*changefeed.Changefeed
stateChangedCh chan *ChangefeedStateChangeEvent
backend changefeed.Backend

cancel func()
closed atomic.Bool
}

func New(node *node.Info,
Expand All @@ -81,41 +85,53 @@ func New(node *node.Info,
nodeInfo: node,
lastTickTime: time.Now(),
gcManager: gc.NewManager(clusterID, pdClient, pdClock),
eventCh: chann.NewAutoDrainChann[*Event](),
pdClient: pdClient,
pdClock: pdClock,
mc: mc,
updatedChangefeedCh: make(chan map[common.ChangeFeedID]*changefeed.Changefeed, 1024),
stateChangedCh: make(chan *ChangefeedStateChangeEvent, 8),
backend: backend,
}
c.stream = dynstream.NewDynamicStream(NewStreamHandler())
c.stream.Start()
c.taskScheduler = threadpool.NewThreadPoolDefault()
c.closed.Store(false)

controller := NewController(
c.version,
c.nodeInfo,
c.updatedChangefeedCh,
c.stateChangedCh,
backend,
c.stream,
c.eventCh,
c.taskScheduler,
batchSize,
balanceCheckInterval,
)

c.controller = controller
if err := c.stream.AddPath("coordinator", controller); err != nil {
log.Panic("failed to add path",
zap.Error(err))
}

// receive messages
mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages)

nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)

nodeManager.RegisterOwnerChangeHandler(string(c.nodeInfo.ID), func(newCoordinatorID string) {
if newCoordinatorID != string(c.nodeInfo.ID) {
log.Info("Coordinator changed, and I am not the coordinator, stop myself",
zap.String("selfID", string(c.nodeInfo.ID)),
zap.String("newCoordinatorID", newCoordinatorID))
c.AsyncStop()
}
})

return c
}

func (c *coordinator) recvMessages(_ context.Context, msg *messaging.TargetMessage) error {
c.stream.Push("coordinator", &Event{message: msg})
if c.closed.Load() {
return nil
}
c.eventCh.In() <- &Event{message: msg}
return nil
}

Expand All @@ -133,6 +149,12 @@ func (c *coordinator) Run(ctx context.Context) error {
updateMetricsTicker := time.NewTicker(time.Second * 1)
defer updateMetricsTicker.Stop()

go c.runHandleEvent(ctx)

failpoint.Inject("coordinator-run-with-error", func() error {
return errors.New("coordinator run with error")
})

for {
select {
case <-ctx.Done():
Expand All @@ -153,8 +175,17 @@ func (c *coordinator) Run(ctx context.Context) error {
if err := c.handleStateChangedEvent(ctx, event); err != nil {
return errors.Trace(err)
}
case <-updateMetricsTicker.C:
c.updateMetricsOnce()
}
}
}

func (c *coordinator) runHandleEvent(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-c.eventCh.Out():
c.controller.HandleEvent(event)
}
}
}
Expand Down Expand Up @@ -261,11 +292,13 @@ func shouldRunChangefeed(state model.FeedState) bool {
}

func (c *coordinator) AsyncStop() {
c.mc.DeRegisterHandler(messaging.CoordinatorTopic)
c.controller.Stop()
c.taskScheduler.Stop()
c.stream.Close()
c.cancel()
if c.closed.CompareAndSwap(false, true) {
c.mc.DeRegisterHandler(messaging.CoordinatorTopic)
c.controller.Stop()
c.taskScheduler.Stop()
c.eventCh.CloseAndDrain()
c.cancel()
}
}

func (c *coordinator) sendMessages(msgs []*messaging.TargetMessage) {
Expand Down Expand Up @@ -294,9 +327,3 @@ func (c *coordinator) updateGCSafepoint(
err := c.gcManager.TryUpdateGCSafePoint(ctx, gcSafepointUpperBound, false)
return errors.Trace(err)
}

func (c *coordinator) updateMetricsOnce() {
dsMetrics := c.stream.GetMetrics()
metricsDSInputChanLen.Set(float64(dsMetrics.EventChanSize))
metricsDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen))
}
31 changes: 25 additions & 6 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/messaging/proto"
"github.com/pingcap/ticdc/pkg/node"
Expand Down Expand Up @@ -230,9 +231,10 @@ func TestCoordinatorScheduling(t *testing.T) {
}()

ctx := context.Background()
nodeManager := watcher.NewNodeManager(nil, nil)
info := node.NewInfo("127.0.0.1:8700", "")
etcdClient := newMockEtcdClient(string(info.ID))
nodeManager := watcher.NewNodeManager(nil, etcdClient)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
info := node.NewInfo("127.0.0.1:38300", "")
nodeManager.GetAliveNodes()[info.ID] = info
mc := messaging.NewMessageCenter(ctx,
info.ID, 100, config.NewDefaultMessageCenterConfig())
Expand Down Expand Up @@ -292,9 +294,10 @@ func TestCoordinatorScheduling(t *testing.T) {

func TestScaleNode(t *testing.T) {
ctx := context.Background()
nodeManager := watcher.NewNodeManager(nil, nil)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
info := node.NewInfo("127.0.0.1:28300", "")
etcdClient := newMockEtcdClient(string(info.ID))
nodeManager := watcher.NewNodeManager(nil, etcdClient)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
nodeManager.GetAliveNodes()[info.ID] = info
mc1 := messaging.NewMessageCenter(ctx, info.ID, 0, config.NewDefaultMessageCenterConfig())
appcontext.SetService(appcontext.MessageCenter, mc1)
Expand Down Expand Up @@ -367,9 +370,10 @@ func TestScaleNode(t *testing.T) {

func TestBootstrapWithUnStoppedChangefeed(t *testing.T) {
ctx := context.Background()
nodeManager := watcher.NewNodeManager(nil, nil)
info := node.NewInfo("127.0.0.1:28301", "")
etcdClient := newMockEtcdClient(string(info.ID))
nodeManager := watcher.NewNodeManager(nil, etcdClient)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
info := node.NewInfo("127.0.0.1:8700", "")
nodeManager.GetAliveNodes()[info.ID] = info
mc1 := messaging.NewMessageCenter(ctx, info.ID, 0, config.NewDefaultMessageCenterConfig())
appcontext.SetService(appcontext.MessageCenter, mc1)
Expand Down Expand Up @@ -485,3 +489,18 @@ func startMaintainerNode(ctx context.Context,
manager: maintainerM,
}
}

type mockEtcdClient struct {
ownerID string
etcd.CDCEtcdClient
}

func newMockEtcdClient(ownerID string) *mockEtcdClient {
return &mockEtcdClient{
ownerID: ownerID,
}
}

func (m *mockEtcdClient) GetOwnerID(ctx context.Context) (model.CaptureID, error) {
return model.CaptureID(m.ownerID), nil
}
28 changes: 0 additions & 28 deletions coordinator/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package coordinator

import (
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/utils/dynstream"
)

const (
Expand All @@ -29,30 +28,3 @@ type Event struct {
eventType int
message *messaging.TargetMessage
}

// StreamHandler implements the dynstream Handler, no real logic, just forward event
type StreamHandler struct{}

func NewStreamHandler() *StreamHandler {
return &StreamHandler{}
}

func (m *StreamHandler) Path(_ *Event) string {
return "coordinator"
}

func (m *StreamHandler) Handle(dest *Controller, events ...*Event) bool {
if len(events) != 1 {
// TODO: Support batch
panic("unexpected event count")
}
event := events[0]
return dest.HandleEvent(event)
}

func (m *StreamHandler) GetSize(_ *Event) int { return 0 }
func (m *StreamHandler) GetArea(_ string, _ *Controller) int { return 0 }
func (m *StreamHandler) GetTimestamp(_ *Event) dynstream.Timestamp { return 0 }
func (m *StreamHandler) GetType(_ *Event) dynstream.EventType { return dynstream.DefaultEventType }
func (m *StreamHandler) IsPaused(_ *Event) bool { return false }
func (m *StreamHandler) OnDrop(_ *Event) {}
27 changes: 0 additions & 27 deletions coordinator/helper_test.go

This file was deleted.

Loading