From 9fa37cdd17e1649840f67b8fdbdc6bd3805d8206 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 14 Jan 2025 11:44:06 +0800 Subject: [PATCH 1/7] test: add log to helo debug Signed-off-by: dongmen <414110582@qq.com> --- pkg/sink/codec/canal/helper.go | 4 ++-- tests/integration_tests/resolve_lock/main.go | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/sink/codec/canal/helper.go b/pkg/sink/codec/canal/helper.go index 2b562f717d..aee2585c77 100644 --- a/pkg/sink/codec/canal/helper.go +++ b/pkg/sink/codec/canal/helper.go @@ -15,11 +15,10 @@ package canal import ( "fmt" - "github.com/pingcap/log" - "go.uber.org/zap" "math" "strconv" + "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/sink/codec/internal" // nolint:staticcheck mm "github.com/pingcap/tidb/pkg/meta/model" @@ -28,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" canal "github.com/pingcap/tiflow/proto/canal" + "go.uber.org/zap" ) func formatColumnValue(row *chunk.Row, idx int, columnInfo *timodel.ColumnInfo, flag *common.ColumnFlagType) (string, internal.JavaSQLType) { diff --git a/tests/integration_tests/resolve_lock/main.go b/tests/integration_tests/resolve_lock/main.go index bf8726f8ab..03074bd35a 100644 --- a/tests/integration_tests/resolve_lock/main.go +++ b/tests/integration_tests/resolve_lock/main.go @@ -134,7 +134,12 @@ func addLock(ctx context.Context, cfg *util.Config) error { pdcli: pdcli, kv: store.(tikv.Storage), } - return errors.Trace(locker.generateLocks(ctx, 10*time.Second)) + err = locker.generateLocks(ctx, 10*time.Second) + if err != nil { + return errors.Trace(err) + } + log.Info("generateLocks done") + return nil } // getTableID of the table with specified table name. From faabd3747218d9bab84ba624debfdb61b54e85dc Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 14 Jan 2025 13:17:10 +0800 Subject: [PATCH 2/7] test: skip resolve lock cli case Signed-off-by: dongmen <414110582@qq.com> --- tests/integration_tests/cli_with_auth/run.sh | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/integration_tests/cli_with_auth/run.sh b/tests/integration_tests/cli_with_auth/run.sh index 22ed39c28b..25f430a05d 100644 --- a/tests/integration_tests/cli_with_auth/run.sh +++ b/tests/integration_tests/cli_with_auth/run.sh @@ -146,17 +146,19 @@ EOF run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --tz="Asia/Shanghai" fi - # Smoke test unsafe commands + # Test unsafe commands echo "y" | run_cdc_cli unsafe delete-service-gc-safepoint run_cdc_cli unsafe reset --no-confirm --pd=$pd_addr REGION_ID=$(pd-ctl -u=$pd_addr region | jq '.regions[0].id') TS=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) # wait for owner online sleep 3 - run_cdc_cli unsafe resolve-lock --region=$REGION_ID - run_cdc_cli unsafe resolve-lock --region=$REGION_ID --ts=$TS - # Smoke test change log level + # Fixme: uncomment this after we fix: https://github.com/pingcap/ticdc/issues/866 + # run_cdc_cli unsafe resolve-lock --region=$REGION_ID + # run_cdc_cli unsafe resolve-lock --region=$REGION_ID --ts=$TS + + # Test change log level curl -X POST -d '"warn"' http://127.0.0.1:8300/api/v2/log sleep 3 # make sure TiCDC does not panic From 65fa8f7280da5b39f2caae6c9925f95e72e9f3b3 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 14 Jan 2025 13:35:28 +0800 Subject: [PATCH 3/7] test: add check Signed-off-by: dongmen <414110582@qq.com> --- tests/integration_tests/cli_with_auth/run.sh | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/integration_tests/cli_with_auth/run.sh b/tests/integration_tests/cli_with_auth/run.sh index 25f430a05d..6d513dee8a 100644 --- a/tests/integration_tests/cli_with_auth/run.sh +++ b/tests/integration_tests/cli_with_auth/run.sh @@ -152,7 +152,21 @@ EOF REGION_ID=$(pd-ctl -u=$pd_addr region | jq '.regions[0].id') TS=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) # wait for owner online - sleep 3 + sleep 5 + + # Check if the owner is online + for i in {1..100}; do + run_cdc_cli capture list | grep -q "\"is-owner\": true" + if [[ $? -eq 0 ]]; then + break + fi + sleep 1 + done + if [[ $? -ne 0 ]]; then + echo "[$(date)] <<<<< owner is not online >>>>>" + run_cdc_cli capture list + exit 1 + fi # Fixme: uncomment this after we fix: https://github.com/pingcap/ticdc/issues/866 # run_cdc_cli unsafe resolve-lock --region=$REGION_ID From 6dda0c44df015f8dc8945fb170e303d87100971d Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 14 Jan 2025 15:46:44 +0800 Subject: [PATCH 4/7] *: fix server exit stuck Signed-off-by: dongmen <414110582@qq.com> --- cmd/cdc/server/server.go | 42 ++++++++++++------- cmd/kafka-consumer/writer.go | 2 +- cmd/pulsar-consumer/main.go | 2 +- logservice/eventstore/event_store.go | 31 ++++++++++---- logservice/schemastore/persist_storage.go | 8 ---- logservice/schemastore/schema_store.go | 15 ++++++- pkg/eventservice/event_service.go | 6 ++- pkg/messaging/message_center.go | 4 ++ pkg/messaging/target.go | 7 +++- pkg/messaging/target_test.go | 4 +- pkg/sink/codec/canal/codec_test.go | 3 +- server/module_election.go | 10 +++-- server/module_grpc.go | 5 +++ server/server.go | 31 ++++++++------ server/watcher/module_node_manager.go | 6 ++- tests/integration_tests/_utils/check_logs | 1 - .../integration_tests/availability/capture.sh | 12 +++--- tests/integration_tests/availability/owner.sh | 15 ++++--- tests/integration_tests/availability/run.sh | 12 +++--- tests/integration_tests/cli_with_auth/run.sh | 11 +++-- tests/integration_tests/http_api/run.sh | 5 +-- 21 files changed, 147 insertions(+), 85 deletions(-) diff --git a/cmd/cdc/server/server.go b/cmd/cdc/server/server.go index a3b544c483..b184ee5b8a 100644 --- a/cmd/cdc/server/server.go +++ b/cmd/cdc/server/server.go @@ -17,6 +17,7 @@ import ( "context" "os" "strings" + "time" "github.com/fatih/color" "github.com/pingcap/errors" @@ -104,22 +105,33 @@ func (o *options) run(cmd *cobra.Command) error { util.LogHTTPProxies() - svr, err := server.New(o.serverConfig, o.pdEndpoints) - if err != nil { - log.Error("create cdc server failed", zap.Error(err)) - return errors.Trace(err) - } - log.Info("TiCDC(new arch) server created", - zap.Strings("pd", o.pdEndpoints), zap.Stringer("config", o.serverConfig)) - - // Run TiCDC server. - err = svr.Run(ctx) - if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("cdc server exits with error", zap.Error(err)) - } else { - log.Info("cdc server exits normally") + for { + svr, err := server.New(o.serverConfig, o.pdEndpoints) + if err != nil { + log.Error("create cdc server failed", zap.Error(err)) + return errors.Trace(err) + } + log.Info("TiCDC(new arch) server created", + zap.Strings("pd", o.pdEndpoints), zap.Stringer("config", o.serverConfig)) + + err = svr.Run(ctx) + if err != nil && errors.Cause(err) != context.Canceled { + log.Warn("cdc server exits with error", zap.Error(err)) + } else { + log.Info("cdc server exits normally") + } + // Close the server + svr.Close(ctx) + + if strings.Contains(err.Error(), "ErrCaptureSuicide") { + log.Info("server exit with capture suicide error, restart it again", zap.Error(err)) + time.Sleep(1 * time.Second) + continue + } + + break } - svr.Close(ctx) + return nil } diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index a40c85500e..5e4158c1a4 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -397,7 +397,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool if counter > w.option.maxBatchSize { log.Panic("Open Protocol max-batch-size exceeded", - zap.Int("max-batch-size", w.option.maxBatchSize), zap.Int("actual-batch-size", counter), + zap.Int("maxBatchSize", w.option.maxBatchSize), zap.Int("actualBatchSize", counter), zap.Int32("partition", partition), zap.Any("offset", offset)) } diff --git a/cmd/pulsar-consumer/main.go b/cmd/pulsar-consumer/main.go index 1159b04409..0e3e9f3425 100644 --- a/cmd/pulsar-consumer/main.go +++ b/cmd/pulsar-consumer/main.go @@ -284,7 +284,7 @@ func NewPulsarConsumer(option *ConsumerOption) (pulsar.Consumer, pulsar.Client) auth.ConfigParamScope: option.oauth2Scope, auth.ConfigParamType: auth.ConfigParamTypeClientCredentials, }) - log.Info("oauth2 authentication is enabled", zap.String("issuer url", option.oauth2IssuerURL)) + log.Info("oauth2 authentication is enabled", zap.String("issuerUrl", option.oauth2IssuerURL)) clientOption.Authentication = authentication } if len(option.mtlsAuthTLSCertificatePath) != 0 { diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 8cbb240b73..4e97f692c0 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -304,22 +304,27 @@ func newWriteTaskPool(store *eventStore, db *pebble.DB, ch *chann.UnlimitedChann } } -func (p *writeTaskPool) run(_ context.Context) { +func (p *writeTaskPool) run(ctx context.Context) { p.store.wg.Add(p.workerNum) for i := 0; i < p.workerNum; i++ { go func() { defer p.store.wg.Done() buffer := make([]eventWithCallback, 0, 128) for { - events, ok := p.dataCh.GetMultipleNoGroup(buffer) - if !ok { + select { + case <-ctx.Done(): return + default: + events, ok := p.dataCh.GetMultipleNoGroup(buffer) + if !ok { + return + } + p.store.writeEvents(p.db, events) + for i := range events { + events[i].callback() + } + buffer = buffer[:0] } - p.store.writeEvents(p.db, events) - for i := range events { - events[i].callback() - } - buffer = buffer[:0] } }() } @@ -342,6 +347,10 @@ func (e *eventStore) Name() string { } func (e *eventStore) Run(ctx context.Context) error { + log.Info("event store start to run") + defer func() { + log.Info("event store exited") + }() eg, ctx := errgroup.WithContext(ctx) for _, p := range e.writeTaskPools { @@ -369,13 +378,17 @@ func (e *eventStore) Run(ctx context.Context) error { } func (e *eventStore) Close(ctx context.Context) error { - e.wg.Wait() + log.Info("event store start to close") + defer log.Info("event store closed") + log.Info("closing pebble db") for _, db := range e.dbs { if err := db.Close(); err != nil { log.Error("failed to close pebble db", zap.Error(err)) } } + log.Info("pebble db closed") + return nil } diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 3348b5a60c..e0bbd36447 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -181,14 +181,6 @@ func newPersistentStorage( dataStorage.initializeFromKVStorage(dbPath, storage, gcSafePoint) } - go func() { - dataStorage.gc(ctx) - }() - - go func() { - dataStorage.persistUpperBoundPeriodically(ctx) - }() - return dataStorage } diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 64a5ff78d1..388277be35 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -130,16 +130,29 @@ func (s *schemaStore) Name() string { func (s *schemaStore) Run(ctx context.Context) error { log.Info("schema store begin to run") + defer func() { + log.Info("schema store exited") + }() + eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { return s.updateResolvedTsPeriodically(ctx) }) + eg.Go(func() error { + return s.dataStorage.gc(ctx) + }) + + eg.Go(func() error { + return s.dataStorage.persistUpperBoundPeriodically(ctx) + }) + return eg.Wait() } func (s *schemaStore) Close(ctx context.Context) error { - log.Info("schema store closed") + log.Info("schema store start to close") + defer log.Info("schema store closed") return s.dataStorage.close() } diff --git a/pkg/eventservice/event_service.go b/pkg/eventservice/event_service.go index 08bc5c4b15..c792fdbdae 100644 --- a/pkg/eventservice/event_service.go +++ b/pkg/eventservice/event_service.go @@ -85,11 +85,13 @@ func (s *eventService) Name() string { } func (s *eventService) Run(ctx context.Context) error { - log.Info("start event service") + log.Info("event service start to run") + defer func() { + log.Info("event service exited") + }() for { select { case <-ctx.Done(): - log.Info("event service exited") return nil case info := <-s.dispatcherInfo: switch info.GetActionType() { diff --git a/pkg/messaging/message_center.go b/pkg/messaging/message_center.go index 4359b7a072..ea44e8cbe4 100644 --- a/pkg/messaging/message_center.go +++ b/pkg/messaging/message_center.go @@ -103,6 +103,7 @@ type messageCenter struct { receiveEventCh chan *TargetMessage receiveCmdCh chan *TargetMessage g *errgroup.Group + ctx context.Context cancel context.CancelFunc } @@ -127,6 +128,7 @@ func NewMessageCenter( receiveEventCh: receiveEventCh, receiveCmdCh: receiveCmdCh, cancel: cancel, + ctx: ctx, g: g, router: newRouter(), } @@ -291,6 +293,7 @@ func (mc *messageCenter) touchRemoteTarget(id node.ID, epoch uint64, addr string if !ok { // If the target is not found, create a new one. target = newRemoteMessageTarget( + mc.ctx, mc.id, id, mc.epoch, epoch, addr, mc.receiveEventCh, mc.receiveCmdCh, mc.cfg, mc.security) @@ -320,6 +323,7 @@ func (mc *messageCenter) touchRemoteTarget(id node.ID, epoch uint64, addr string zap.Any("newAddr", addr)) target.close() newTarget := newRemoteMessageTarget( + mc.ctx, mc.id, id, mc.epoch, epoch, addr, mc.receiveEventCh, mc.receiveCmdCh, mc.cfg, mc.security) diff --git a/pkg/messaging/target.go b/pkg/messaging/target.go index 3860d62cdd..3c6e8abb1d 100644 --- a/pkg/messaging/target.go +++ b/pkg/messaging/target.go @@ -148,6 +148,7 @@ func (s *remoteMessageTarget) sendCommand(msg ...*TargetMessage) error { } func newRemoteMessageTarget( + ctx context.Context, localID, targetId node.ID, localEpoch, targetEpoch uint64, addr string, @@ -156,7 +157,7 @@ func newRemoteMessageTarget( security *security.Credential, ) *remoteMessageTarget { log.Info("Create remote target", zap.Stringer("local", localID), zap.Stringer("remote", targetId), zap.Any("addr", addr), zap.Any("localEpoch", localEpoch), zap.Any("targetEpoch", targetEpoch)) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) rt := &remoteMessageTarget{ messageCenterID: localID, messageCenterEpoch: localEpoch, @@ -209,6 +210,10 @@ func (s *remoteMessageTarget) runHandleErr(ctx context.Context) { for { select { case <-ctx.Done(): + log.Info("remoteMessageTarget exit", + zap.Any("messageCenterID", s.messageCenterID), + zap.Any("remote", s.targetId), + zap.Any("error", ctx.Err())) return case err := <-s.errCh: switch err.Type { diff --git a/pkg/messaging/target_test.go b/pkg/messaging/target_test.go index 301e3ea726..e958202c4b 100644 --- a/pkg/messaging/target_test.go +++ b/pkg/messaging/target_test.go @@ -14,6 +14,7 @@ package messaging import ( + "context" "testing" "github.com/pingcap/log" @@ -26,9 +27,10 @@ import ( func newRemoteMessageTargetForTest() *remoteMessageTarget { localId := node.NewID() remoteId := node.NewID() + ctx := context.Background() cfg := config.NewDefaultMessageCenterConfig() receivedMsgCh := make(chan *TargetMessage, 1) - rt := newRemoteMessageTarget(localId, remoteId, 1, 1, "", receivedMsgCh, receivedMsgCh, cfg, nil) + rt := newRemoteMessageTarget(ctx, localId, remoteId, 1, 1, "", receivedMsgCh, receivedMsgCh, cfg, nil) return rt } diff --git a/pkg/sink/codec/canal/codec_test.go b/pkg/sink/codec/canal/codec_test.go index 857f4f35d5..397efb3f3c 100644 --- a/pkg/sink/codec/canal/codec_test.go +++ b/pkg/sink/codec/canal/codec_test.go @@ -760,8 +760,7 @@ func TestCheckpointTs(t *testing.T) { helper := pevent.NewEventTestHelper(t) defer helper.Close() - protocolConfig := - common.NewConfig(config.ProtocolCanalJSON) + protocolConfig := common.NewConfig(config.ProtocolCanalJSON) encoder, err := NewJSONRowEventEncoder(context.Background(), protocolConfig) require.NoError(t, err) diff --git a/server/module_election.go b/server/module_election.go index ec51e9985d..60ea862fcb 100644 --- a/server/module_election.go +++ b/server/module_election.go @@ -242,7 +242,9 @@ func (e *elector) campaignLogCoordinator(ctx context.Context) error { zap.String("captureID", string(e.svr.info.ID))) co := logcoordinator.New() - if err := co.Run(ctx); err != nil { + err = co.Run(ctx) + + if err != nil && err != context.Canceled { if !cerror.ErrNotOwner.Equal(err) { if resignErr := e.resignLogCoordinaotr(); resignErr != nil { return errors.Trace(resignErr) @@ -254,8 +256,10 @@ func (e *elector) campaignLogCoordinator(ctx context.Context) error { return errors.Trace(err) } - log.Info("log coordinator resigned successfully", - zap.String("captureID", string(e.svr.info.ID))) + // If coordinator exits normally, continue the campaign loop and try to election coordinator again + log.Info("log coordinator exited normally", + zap.String("captureID", string(e.svr.info.ID)), + zap.String("error", err.Error())) } } diff --git a/server/module_grpc.go b/server/module_grpc.go index cd830ba4a3..fc149851c3 100644 --- a/server/module_grpc.go +++ b/server/module_grpc.go @@ -17,6 +17,7 @@ import ( "context" "net" + "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/messaging" @@ -42,6 +43,10 @@ func NewGrpcServer(lis net.Listener) common.SubModule { } func (g *GrpcModule) Run(ctx context.Context) error { + log.Info("grpc server start to serve") + defer func() { + log.Info("grpc server exited") + }() return g.grpcServer.Serve(g.lis) } diff --git a/server/server.go b/server/server.go index 6dc86f3e5d..13d23a9c50 100644 --- a/server/server.go +++ b/server/server.go @@ -156,14 +156,6 @@ func (c *server) initialize(ctx context.Context) error { for _, subModule := range c.subModules { appctx.SetService(subModule.Name(), subModule) } - // start tcp server - go func() { - err := c.tcpServer.Run(ctx) - if err != nil { - log.Error("tcp server exist", zap.Error(cerror.Trace(err))) - } - }() - log.Info("server initialized", zap.Any("server", c.info)) return nil } @@ -174,16 +166,25 @@ func (c *server) Run(ctx context.Context) error { log.Error("init server failed", zap.Error(err)) return errors.Trace(err) } - defer func() { - c.Close(ctx) - }() g, ctx := errgroup.WithContext(ctx) + // start tcp server + g.Go(func() error { + log.Info("tcp server start to run") + err := c.tcpServer.Run(ctx) + if err != nil { + log.Error("tcp server exited", zap.Error(cerror.Trace(err))) + } + return nil + }) + + log.Info("server initialized", zap.Any("server", c.info)) // start all submodules for _, sub := range c.subModules { func(m common.SubModule) { g.Go(func() error { log.Info("starting sub module", zap.String("module", m.Name())) + defer log.Info("sub module exited", zap.String("module", m.Name())) return m.Run(ctx) }) }(sub) @@ -193,7 +194,12 @@ func (c *server) Run(ctx context.Context) error { if err != nil { return errors.Trace(err) } - return errors.Trace(g.Wait()) + + err = g.Wait() + if err != nil { + log.Error("server exited", zap.Error(cerror.Trace(err))) + } + return errors.Trace(err) } // SelfInfo gets the server info @@ -239,6 +245,7 @@ func (c *server) Close(ctx context.Context) { zap.String("watcher", subModule.Name()), zap.Error(err)) } + log.Info("sub module closed", zap.String("module", subModule.Name())) } // delete server info from etcd diff --git a/server/watcher/module_node_manager.go b/server/watcher/module_node_manager.go index 557ae4cca4..7564068274 100644 --- a/server/watcher/module_node_manager.go +++ b/server/watcher/module_node_manager.go @@ -31,8 +31,10 @@ import ( const NodeManagerName = "node-manager" -type NodeChangeHandler func(map[node.ID]*node.Info) -type OwnerChangeHandler func(newOwnerKeys string) +type ( + NodeChangeHandler func(map[node.ID]*node.Info) + OwnerChangeHandler func(newOwnerKeys string) +) // NodeManager manager the read view of all captures, other modules can get the captures information from it // and register server update event handler diff --git a/tests/integration_tests/_utils/check_logs b/tests/integration_tests/_utils/check_logs index d56267f5c1..d7fe24c972 100755 --- a/tests/integration_tests/_utils/check_logs +++ b/tests/integration_tests/_utils/check_logs @@ -31,4 +31,3 @@ else echo "no DATA RACE found" exit 0 fi - diff --git a/tests/integration_tests/availability/capture.sh b/tests/integration_tests/availability/capture.sh index fc327f1e9d..910ade0eda 100755 --- a/tests/integration_tests/availability/capture.sh +++ b/tests/integration_tests/availability/capture.sh @@ -1,12 +1,12 @@ #!/bin/bash error_handler() { - local line_no=$1 - local error_code=$2 - local last_command="${BASH_COMMAND}" - echo -e "\033[31mError occurred in script $0 at line $line_no" - echo -e "Error code: $error_code" - echo -e "Failed command: $last_command\033[0m" + local line_no=$1 + local error_code=$2 + local last_command="${BASH_COMMAND}" + echo -e "\033[31mError occurred in script $0 at line $line_no" + echo -e "Error code: $error_code" + echo -e "Failed command: $last_command\033[0m" } # Set error handler diff --git a/tests/integration_tests/availability/owner.sh b/tests/integration_tests/availability/owner.sh index 63165d4b3e..73cb7564a6 100755 --- a/tests/integration_tests/availability/owner.sh +++ b/tests/integration_tests/availability/owner.sh @@ -1,12 +1,12 @@ #!/bin/bash error_handler() { - local line_no=$1 - local error_code=$2 - local last_command="${BASH_COMMAND}" - echo -e "\033[31mError occurred in script $0 at line $line_no" - echo -e "Error code: $error_code" - echo -e "Failed command: $last_command\033[0m" + local line_no=$1 + local error_code=$2 + local last_command="${BASH_COMMAND}" + echo -e "\033[31mError occurred in script $0 at line $line_no" + echo -e "Error code: $error_code" + echo -e "Failed command: $last_command\033[0m" } # Set error handler @@ -146,7 +146,6 @@ function test_owner_retryable_error() { cleanup_process $CDC_BINARY } - # make sure when owner key in etcd is deleted, the owner will resign, # and only one owner exists in the cluster at the same time. function test_delete_owner_key() { @@ -189,4 +188,4 @@ function test_delete_owner_key() { export GO_FAILPOINTS='' echo "delete_owner_key pass" cleanup_process $CDC_BINARY -} \ No newline at end of file +} diff --git a/tests/integration_tests/availability/run.sh b/tests/integration_tests/availability/run.sh index c269767f73..893819477e 100644 --- a/tests/integration_tests/availability/run.sh +++ b/tests/integration_tests/availability/run.sh @@ -1,12 +1,12 @@ #!/bin/bash error_handler() { - local line_no=$1 - local error_code=$2 - local last_command="${BASH_COMMAND}" - echo -e "\033[31mError occurred in script $0 at line $line_no" - echo -e "Error code: $error_code" - echo -e "Failed command: $last_command\033[0m" + local line_no=$1 + local error_code=$2 + local last_command="${BASH_COMMAND}" + echo -e "\033[31mError occurred in script $0 at line $line_no" + echo -e "Error code: $error_code" + echo -e "Failed command: $last_command\033[0m" } # Set error handler diff --git a/tests/integration_tests/cli_with_auth/run.sh b/tests/integration_tests/cli_with_auth/run.sh index 6d513dee8a..079ba9cd4f 100644 --- a/tests/integration_tests/cli_with_auth/run.sh +++ b/tests/integration_tests/cli_with_auth/run.sh @@ -147,8 +147,14 @@ EOF fi # Test unsafe commands + echo "Start delete service gc safepoint" echo "y" | run_cdc_cli unsafe delete-service-gc-safepoint + echo "Pass delete service gc safepoint" + + echo "Start reset" run_cdc_cli unsafe reset --no-confirm --pd=$pd_addr + echo "Pass reset" + REGION_ID=$(pd-ctl -u=$pd_addr region | jq '.regions[0].id') TS=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) # wait for owner online @@ -168,9 +174,8 @@ EOF exit 1 fi - # Fixme: uncomment this after we fix: https://github.com/pingcap/ticdc/issues/866 - # run_cdc_cli unsafe resolve-lock --region=$REGION_ID - # run_cdc_cli unsafe resolve-lock --region=$REGION_ID --ts=$TS + run_cdc_cli unsafe resolve-lock --region=$REGION_ID + run_cdc_cli unsafe resolve-lock --region=$REGION_ID --ts=$TS # Test change log level curl -X POST -d '"warn"' http://127.0.0.1:8300/api/v2/log diff --git a/tests/integration_tests/http_api/run.sh b/tests/integration_tests/http_api/run.sh index 7e305c57ed..b29c07c085 100644 --- a/tests/integration_tests/http_api/run.sh +++ b/tests/integration_tests/http_api/run.sh @@ -15,14 +15,13 @@ function run() { return fi - if ! python3 -m pip show requests &> /dev/null; then - echo "requests not installed, installing..." + if ! python3 -m pip show requests &>/dev/null; then + echo "requests not installed, installing..." sudo python3 -m pip install -U requests else echo "requests installed." fi - rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR --multiple-upstream-pd true From a7db8daa6cda138a9a999c99d62d5b7f02c941b0 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 14 Jan 2025 16:06:44 +0800 Subject: [PATCH 5/7] test: adjust test Signed-off-by: dongmen <414110582@qq.com> --- tests/integration_tests/cli_with_auth/run.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration_tests/cli_with_auth/run.sh b/tests/integration_tests/cli_with_auth/run.sh index 079ba9cd4f..48161cb91a 100644 --- a/tests/integration_tests/cli_with_auth/run.sh +++ b/tests/integration_tests/cli_with_auth/run.sh @@ -162,14 +162,15 @@ EOF # Check if the owner is online for i in {1..100}; do - run_cdc_cli capture list | grep -q "\"is-owner\": true" + curl -s -X GET "http://127.0.0.1:8300/api/v2/captures" | grep -q "\"is_coordinator\":true" if [[ $? -eq 0 ]]; then break fi + echo "owner is not online, retry again, this is the $i time" sleep 1 done if [[ $? -ne 0 ]]; then - echo "[$(date)] <<<<< owner is not online >>>>>" + echo "[$(date)] <<<<< coordinator is not online >>>>>" run_cdc_cli capture list exit 1 fi From a29e3d5455158002684c90fc8935ebd870adf288 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 14 Jan 2025 16:21:52 +0800 Subject: [PATCH 6/7] test: adjust test 2 Signed-off-by: dongmen <414110582@qq.com> --- .../cli_tls_with_auth/run.sh | 22 ++++++++++++++----- tests/integration_tests/cli_with_auth/run.sh | 6 +---- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/tests/integration_tests/cli_tls_with_auth/run.sh b/tests/integration_tests/cli_tls_with_auth/run.sh index 998142b530..1fb827596e 100644 --- a/tests/integration_tests/cli_tls_with_auth/run.sh +++ b/tests/integration_tests/cli_tls_with_auth/run.sh @@ -173,18 +173,30 @@ EOF run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --tz="Asia/Shanghai" fi - # Smoke test unsafe commands + # Test unsafe commands echo "y" | run_cdc_cli unsafe delete-service-gc-safepoint run_cdc_cli unsafe reset --no-confirm --pd=$pd_addr REGION_ID=$(pd-ctl --cacert="${TLS_DIR}/ca.pem" --cert="${TLS_DIR}/client.pem" --key="${TLS_DIR}/client-key.pem" -u=$pd_addr region | jq '.regions[0].id') TS=$(run_cdc_cli_tso_query $TLS_PD_HOST $TLS_PD_PORT true) - # wait for owner online - sleep 3 + + # Check if the coordinator is online + for i in {1..100}; do + curl -s -X GET "http://127.0.0.1:8300/api/v2/captures" | grep -q "\"is_coordinator\":true" + if [[ $? -eq 0 ]]; then + break + fi + echo "owner is not online, retry again, this is the $i time" + sleep 1 + done + if [[ $? -ne 0 ]]; then + echo "[$(date)] <<<<< coordinator is not online >>>>>" + run_cdc_cli capture list + exit 1 + fi + run_cdc_cli unsafe resolve-lock --region=$REGION_ID run_cdc_cli unsafe resolve-lock --region=$REGION_ID --ts=$TS - # Smoke test change log level - curl -X POST -d '"warn"' https://127.0.0.1:8300/api/v2/log --cacert "${TLS_DIR}/ca.pem" --cert "${TLS_DIR}/client.pem" --key "${TLS_DIR}/client-key.pem" sleep 3 # make sure TiCDC does not panic curl https://127.0.0.1:8300/status --cacert "${TLS_DIR}/ca.pem" --cert "${TLS_DIR}/client.pem" --key "${TLS_DIR}/client-key.pem" diff --git a/tests/integration_tests/cli_with_auth/run.sh b/tests/integration_tests/cli_with_auth/run.sh index 48161cb91a..106ba118a5 100644 --- a/tests/integration_tests/cli_with_auth/run.sh +++ b/tests/integration_tests/cli_with_auth/run.sh @@ -157,10 +157,8 @@ EOF REGION_ID=$(pd-ctl -u=$pd_addr region | jq '.regions[0].id') TS=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) - # wait for owner online - sleep 5 - # Check if the owner is online + # Check if the coordinator is online for i in {1..100}; do curl -s -X GET "http://127.0.0.1:8300/api/v2/captures" | grep -q "\"is_coordinator\":true" if [[ $? -eq 0 ]]; then @@ -178,8 +176,6 @@ EOF run_cdc_cli unsafe resolve-lock --region=$REGION_ID run_cdc_cli unsafe resolve-lock --region=$REGION_ID --ts=$TS - # Test change log level - curl -X POST -d '"warn"' http://127.0.0.1:8300/api/v2/log sleep 3 # make sure TiCDC does not panic curl http://127.0.0.1:8300/status From df32d9e77cf768c02791a7ae57b65c0bce264c7e Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 14 Jan 2025 16:26:50 +0800 Subject: [PATCH 7/7] test: adjust test 3 Signed-off-by: dongmen <414110582@qq.com> --- tests/integration_tests/cli_tls_with_auth/run.sh | 7 ++++--- tests/integration_tests/cli_with_auth/run.sh | 5 ++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/integration_tests/cli_tls_with_auth/run.sh b/tests/integration_tests/cli_tls_with_auth/run.sh index 1fb827596e..ae0186d17a 100644 --- a/tests/integration_tests/cli_tls_with_auth/run.sh +++ b/tests/integration_tests/cli_tls_with_auth/run.sh @@ -176,12 +176,11 @@ EOF # Test unsafe commands echo "y" | run_cdc_cli unsafe delete-service-gc-safepoint run_cdc_cli unsafe reset --no-confirm --pd=$pd_addr - REGION_ID=$(pd-ctl --cacert="${TLS_DIR}/ca.pem" --cert="${TLS_DIR}/client.pem" --key="${TLS_DIR}/client-key.pem" -u=$pd_addr region | jq '.regions[0].id') - TS=$(run_cdc_cli_tso_query $TLS_PD_HOST $TLS_PD_PORT true) + # Check if the coordinator is online for i in {1..100}; do - curl -s -X GET "http://127.0.0.1:8300/api/v2/captures" | grep -q "\"is_coordinator\":true" + curl -s -X GET "https://127.0.0.1:8300/api/v2/captures" --cacert "${TLS_DIR}/ca.pem" --cert "${TLS_DIR}/client.pem" --key "${TLS_DIR}/client-key.pem" | grep -q "\"is_coordinator\":true" if [[ $? -eq 0 ]]; then break fi @@ -194,6 +193,8 @@ EOF exit 1 fi + REGION_ID=$(pd-ctl --cacert="${TLS_DIR}/ca.pem" --cert="${TLS_DIR}/client.pem" --key="${TLS_DIR}/client-key.pem" -u=$pd_addr region | jq '.regions[0].id') + TS=$(run_cdc_cli_tso_query $TLS_PD_HOST $TLS_PD_PORT true) run_cdc_cli unsafe resolve-lock --region=$REGION_ID run_cdc_cli unsafe resolve-lock --region=$REGION_ID --ts=$TS diff --git a/tests/integration_tests/cli_with_auth/run.sh b/tests/integration_tests/cli_with_auth/run.sh index 106ba118a5..ca538c6935 100644 --- a/tests/integration_tests/cli_with_auth/run.sh +++ b/tests/integration_tests/cli_with_auth/run.sh @@ -155,9 +155,6 @@ EOF run_cdc_cli unsafe reset --no-confirm --pd=$pd_addr echo "Pass reset" - REGION_ID=$(pd-ctl -u=$pd_addr region | jq '.regions[0].id') - TS=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) - # Check if the coordinator is online for i in {1..100}; do curl -s -X GET "http://127.0.0.1:8300/api/v2/captures" | grep -q "\"is_coordinator\":true" @@ -173,6 +170,8 @@ EOF exit 1 fi + REGION_ID=$(pd-ctl -u=$pd_addr region | jq '.regions[0].id') + TS=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) run_cdc_cli unsafe resolve-lock --region=$REGION_ID run_cdc_cli unsafe resolve-lock --region=$REGION_ID --ts=$TS