Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions cmd/cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"os"
"strings"
"time"

"github.com/fatih/color"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -104,22 +105,33 @@ func (o *options) run(cmd *cobra.Command) error {

util.LogHTTPProxies()

svr, err := server.New(o.serverConfig, o.pdEndpoints)
if err != nil {
log.Error("create cdc server failed", zap.Error(err))
return errors.Trace(err)
}
log.Info("TiCDC(new arch) server created",
zap.Strings("pd", o.pdEndpoints), zap.Stringer("config", o.serverConfig))

// Run TiCDC server.
err = svr.Run(ctx)
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("cdc server exits with error", zap.Error(err))
} else {
log.Info("cdc server exits normally")
for {
svr, err := server.New(o.serverConfig, o.pdEndpoints)
if err != nil {
log.Error("create cdc server failed", zap.Error(err))
return errors.Trace(err)
}
log.Info("TiCDC(new arch) server created",
zap.Strings("pd", o.pdEndpoints), zap.Stringer("config", o.serverConfig))

err = svr.Run(ctx)
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("cdc server exits with error", zap.Error(err))
} else {
log.Info("cdc server exits normally")
}
// Close the server
svr.Close(ctx)

if strings.Contains(err.Error(), "ErrCaptureSuicide") {
log.Info("server exit with capture suicide error, restart it again", zap.Error(err))
time.Sleep(1 * time.Second)
continue
}

break
}
svr.Close(ctx)

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool

if counter > w.option.maxBatchSize {
log.Panic("Open Protocol max-batch-size exceeded",
zap.Int("max-batch-size", w.option.maxBatchSize), zap.Int("actual-batch-size", counter),
zap.Int("maxBatchSize", w.option.maxBatchSize), zap.Int("actualBatchSize", counter),
zap.Int32("partition", partition), zap.Any("offset", offset))
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/pulsar-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func NewPulsarConsumer(option *ConsumerOption) (pulsar.Consumer, pulsar.Client)
auth.ConfigParamScope: option.oauth2Scope,
auth.ConfigParamType: auth.ConfigParamTypeClientCredentials,
})
log.Info("oauth2 authentication is enabled", zap.String("issuer url", option.oauth2IssuerURL))
log.Info("oauth2 authentication is enabled", zap.String("issuerUrl", option.oauth2IssuerURL))
clientOption.Authentication = authentication
}
if len(option.mtlsAuthTLSCertificatePath) != 0 {
Expand Down
31 changes: 22 additions & 9 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,22 +304,27 @@ func newWriteTaskPool(store *eventStore, db *pebble.DB, ch *chann.UnlimitedChann
}
}

func (p *writeTaskPool) run(_ context.Context) {
func (p *writeTaskPool) run(ctx context.Context) {
p.store.wg.Add(p.workerNum)
for i := 0; i < p.workerNum; i++ {
go func() {
defer p.store.wg.Done()
buffer := make([]eventWithCallback, 0, 128)
for {
events, ok := p.dataCh.GetMultipleNoGroup(buffer)
if !ok {
select {
case <-ctx.Done():
return
default:
events, ok := p.dataCh.GetMultipleNoGroup(buffer)
if !ok {
return
}
p.store.writeEvents(p.db, events)
for i := range events {
events[i].callback()
}
buffer = buffer[:0]
}
p.store.writeEvents(p.db, events)
for i := range events {
events[i].callback()
}
buffer = buffer[:0]
}
}()
}
Expand All @@ -342,6 +347,10 @@ func (e *eventStore) Name() string {
}

func (e *eventStore) Run(ctx context.Context) error {
log.Info("event store start to run")
defer func() {
log.Info("event store exited")
}()
eg, ctx := errgroup.WithContext(ctx)

for _, p := range e.writeTaskPools {
Expand Down Expand Up @@ -369,13 +378,17 @@ func (e *eventStore) Run(ctx context.Context) error {
}

func (e *eventStore) Close(ctx context.Context) error {
e.wg.Wait()
log.Info("event store start to close")
defer log.Info("event store closed")

log.Info("closing pebble db")
for _, db := range e.dbs {
if err := db.Close(); err != nil {
log.Error("failed to close pebble db", zap.Error(err))
}
}
log.Info("pebble db closed")

return nil
}

Expand Down
8 changes: 0 additions & 8 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,6 @@ func newPersistentStorage(
dataStorage.initializeFromKVStorage(dbPath, storage, gcSafePoint)
}

go func() {
dataStorage.gc(ctx)
}()

go func() {
dataStorage.persistUpperBoundPeriodically(ctx)
}()

return dataStorage
}

Expand Down
15 changes: 14 additions & 1 deletion logservice/schemastore/schema_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,29 @@ func (s *schemaStore) Name() string {

func (s *schemaStore) Run(ctx context.Context) error {
log.Info("schema store begin to run")
defer func() {
log.Info("schema store exited")
}()

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return s.updateResolvedTsPeriodically(ctx)
})

eg.Go(func() error {
return s.dataStorage.gc(ctx)
})

eg.Go(func() error {
return s.dataStorage.persistUpperBoundPeriodically(ctx)
})

return eg.Wait()
}

func (s *schemaStore) Close(ctx context.Context) error {
log.Info("schema store closed")
log.Info("schema store start to close")
defer log.Info("schema store closed")
return s.dataStorage.close()
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/eventservice/event_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ func (s *eventService) Name() string {
}

func (s *eventService) Run(ctx context.Context) error {
log.Info("start event service")
log.Info("event service start to run")
defer func() {
log.Info("event service exited")
}()
for {
select {
case <-ctx.Done():
log.Info("event service exited")
return nil
case info := <-s.dispatcherInfo:
switch info.GetActionType() {
Expand Down
4 changes: 4 additions & 0 deletions pkg/messaging/message_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type messageCenter struct {
receiveEventCh chan *TargetMessage
receiveCmdCh chan *TargetMessage
g *errgroup.Group
ctx context.Context
cancel context.CancelFunc
}

Expand All @@ -127,6 +128,7 @@ func NewMessageCenter(
receiveEventCh: receiveEventCh,
receiveCmdCh: receiveCmdCh,
cancel: cancel,
ctx: ctx,
g: g,
router: newRouter(),
}
Expand Down Expand Up @@ -291,6 +293,7 @@ func (mc *messageCenter) touchRemoteTarget(id node.ID, epoch uint64, addr string
if !ok {
// If the target is not found, create a new one.
target = newRemoteMessageTarget(
mc.ctx,
mc.id, id, mc.epoch,
epoch, addr, mc.receiveEventCh,
mc.receiveCmdCh, mc.cfg, mc.security)
Expand Down Expand Up @@ -320,6 +323,7 @@ func (mc *messageCenter) touchRemoteTarget(id node.ID, epoch uint64, addr string
zap.Any("newAddr", addr))
target.close()
newTarget := newRemoteMessageTarget(
mc.ctx,
mc.id, id, mc.epoch,
epoch, addr, mc.receiveEventCh,
mc.receiveCmdCh, mc.cfg, mc.security)
Expand Down
7 changes: 6 additions & 1 deletion pkg/messaging/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (s *remoteMessageTarget) sendCommand(msg ...*TargetMessage) error {
}

func newRemoteMessageTarget(
ctx context.Context,
localID, targetId node.ID,
localEpoch, targetEpoch uint64,
addr string,
Expand All @@ -156,7 +157,7 @@ func newRemoteMessageTarget(
security *security.Credential,
) *remoteMessageTarget {
log.Info("Create remote target", zap.Stringer("local", localID), zap.Stringer("remote", targetId), zap.Any("addr", addr), zap.Any("localEpoch", localEpoch), zap.Any("targetEpoch", targetEpoch))
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
rt := &remoteMessageTarget{
messageCenterID: localID,
messageCenterEpoch: localEpoch,
Expand Down Expand Up @@ -209,6 +210,10 @@ func (s *remoteMessageTarget) runHandleErr(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Info("remoteMessageTarget exit",
zap.Any("messageCenterID", s.messageCenterID),
zap.Any("remote", s.targetId),
zap.Any("error", ctx.Err()))
return
case err := <-s.errCh:
switch err.Type {
Expand Down
4 changes: 3 additions & 1 deletion pkg/messaging/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package messaging

import (
"context"
"testing"

"github.com/pingcap/log"
Expand All @@ -26,9 +27,10 @@ import (
func newRemoteMessageTargetForTest() *remoteMessageTarget {
localId := node.NewID()
remoteId := node.NewID()
ctx := context.Background()
cfg := config.NewDefaultMessageCenterConfig()
receivedMsgCh := make(chan *TargetMessage, 1)
rt := newRemoteMessageTarget(localId, remoteId, 1, 1, "", receivedMsgCh, receivedMsgCh, cfg, nil)
rt := newRemoteMessageTarget(ctx, localId, remoteId, 1, 1, "", receivedMsgCh, receivedMsgCh, cfg, nil)
return rt
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/sink/codec/canal/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,7 @@ func TestCheckpointTs(t *testing.T) {
helper := pevent.NewEventTestHelper(t)
defer helper.Close()

protocolConfig :=
common.NewConfig(config.ProtocolCanalJSON)
protocolConfig := common.NewConfig(config.ProtocolCanalJSON)
encoder, err := NewJSONRowEventEncoder(context.Background(), protocolConfig)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/codec/canal/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ package canal

import (
"fmt"
"github.com/pingcap/log"
"go.uber.org/zap"
"math"
"strconv"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/sink/codec/internal" // nolint:staticcheck
mm "github.com/pingcap/tidb/pkg/meta/model"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
canal "github.com/pingcap/tiflow/proto/canal"
"go.uber.org/zap"
)

func formatColumnValue(row *chunk.Row, idx int, columnInfo *timodel.ColumnInfo, flag *common.ColumnFlagType) (string, internal.JavaSQLType) {
Expand Down
10 changes: 7 additions & 3 deletions server/module_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ func (e *elector) campaignLogCoordinator(ctx context.Context) error {
zap.String("captureID", string(e.svr.info.ID)))

co := logcoordinator.New()
if err := co.Run(ctx); err != nil {
err = co.Run(ctx)

if err != nil && err != context.Canceled {
if !cerror.ErrNotOwner.Equal(err) {
if resignErr := e.resignLogCoordinaotr(); resignErr != nil {
return errors.Trace(resignErr)
Expand All @@ -254,8 +256,10 @@ func (e *elector) campaignLogCoordinator(ctx context.Context) error {
return errors.Trace(err)
}

log.Info("log coordinator resigned successfully",
zap.String("captureID", string(e.svr.info.ID)))
// If coordinator exits normally, continue the campaign loop and try to election coordinator again
log.Info("log coordinator exited normally",
zap.String("captureID", string(e.svr.info.ID)),
zap.String("error", err.Error()))
}
}

Expand Down
5 changes: 5 additions & 0 deletions server/module_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"net"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/messaging"
Expand All @@ -42,6 +43,10 @@ func NewGrpcServer(lis net.Listener) common.SubModule {
}

func (g *GrpcModule) Run(ctx context.Context) error {
log.Info("grpc server start to serve")
defer func() {
log.Info("grpc server exited")
}()
return g.grpcServer.Serve(g.lis)
}

Expand Down
Loading