Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 45 additions & 14 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,20 @@ jobs:
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_C

- name: Upload test logs
if: always()
uses: ./.github/actions/upload-test-logs
- name: Copy logs to hack permission
if: ${{ always() }}
run: |
TEMP_DIR=$(mktemp -d)
mkdir -p "$TEMP_DIR"
sudo find /tmp/tidb_cdc_test -type d -name 'fail_over*' -exec sh -c 'find "{}" -type f -name "cdc*.log" -print0' \; | tar -czvf logs.tar.gz -C /tmp/tidb_cdc_test --null -T -
# Update logs as artifact seems not stable, so we set `continue-on-error: true` here.
- name: Upload logs
uses: actions/upload-artifact@v4
if: ${{ always() }}
with:
log-name: failover_group1
name: upstream-failover-logs1
path: |
./logs.tar.gz


failover_e2e_test2:
Expand Down Expand Up @@ -163,13 +172,21 @@ jobs:
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_G

- name: Upload test logs
if: always()
uses: ./.github/actions/upload-test-logs
- name: Copy logs to hack permission
if: ${{ always() }}
run: |
TEMP_DIR=$(mktemp -d)
mkdir -p "$TEMP_DIR"
sudo find /tmp/tidb_cdc_test -type d -name 'fail_over*' -exec sh -c 'find "{}" -type f -name "cdc*.log" -print0' \; | tar -czvf logs.tar.gz -C /tmp/tidb_cdc_test --null -T -
# Update logs as artifact seems not stable, so we set `continue-on-error: true` here.
- name: Upload logs
uses: actions/upload-artifact@v4
if: ${{ always() }}
with:
log-name: failover_group_2
name: upstream-failover-logs2
path: |
./logs.tar.gz


failover_e2e_test3:
runs-on: ubuntu-latest
name: Failover E2E Test[H-K]
Expand All @@ -188,14 +205,28 @@ jobs:
go build -o ./tools/bin/failpoint-ctl github.com/pingcap/failpoint/failpoint-ctl
make integration_test_build
ls -l bin/ && ls -l tools/bin/


- name: Test fail_over_ddl_I
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_I

- name: Test fail_over_ddl_J
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_J

- name: Upload test logs
if: always()
uses: ./.github/actions/upload-test-logs
- name: Copy logs to hack permission
if: ${{ always() }}
run: |
TEMP_DIR=$(mktemp -d)
mkdir -p "$TEMP_DIR"
sudo find /tmp/tidb_cdc_test -type d -name 'fail_over*' -exec sh -c 'find "{}" -type f -name "cdc*.log" -print0' \; | tar -czvf logs.tar.gz -C /tmp/tidb_cdc_test --null -T -
# Update logs as artifact seems not stable, so we set `continue-on-error: true` here.
- name: Upload logs
uses: actions/upload-artifact@v4
if: ${{ always() }}
with:
log-name: failover-group3
name: upstream-failover-logs3
path: |
./logs.tar.gz
24 changes: 17 additions & 7 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ type Controller struct {
bootstrapped *atomic.Bool
bootstrapper *bootstrap.Bootstrapper[heartbeatpb.CoordinatorBootstrapResponse]

nodeChanged *atomic.Bool
mutex sync.Mutex // protect nodeChanged and do onNodeChanged()
nodeChanged bool
nodeManager *watcher.NodeManager

stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler]
Expand Down Expand Up @@ -106,7 +107,7 @@ func NewController(
stream: stream,
taskScheduler: taskScheduler,
backend: backend,
nodeChanged: atomic.NewBool(false),
nodeChanged: false,
updatedChangefeedCh: updatedChangefeedCh,
stateChangedCh: stateChangedCh,
lastPrintStatusTime: time.Now(),
Expand All @@ -116,7 +117,9 @@ func NewController(
nodes := c.nodeManager.GetAliveNodes()
// detect the capture changes
c.nodeManager.RegisterNodeChangeHandler("coordinator-controller", func(allNodes map[node.ID]*node.Info) {
c.nodeChanged.Store(true)
c.mutex.Lock()
c.nodeChanged = true
c.mutex.Unlock()
})
log.Info("changefeed bootstrap initial nodes",
zap.Int("nodes", len(nodes)))
Expand All @@ -143,10 +146,8 @@ func (c *Controller) HandleEvent(event *Event) bool {
}
}()
// first check the online/offline nodes
if c.nodeChanged.Load() {
c.onNodeChanged()
c.nodeChanged.Store(false)
}
c.checkOnNodeChanged()

switch event.eventType {
case EventMessage:
c.onMessage(event.message)
Expand All @@ -156,6 +157,15 @@ func (c *Controller) HandleEvent(event *Event) bool {
return false
}

func (c *Controller) checkOnNodeChanged() {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.nodeChanged {
c.onNodeChanged()
c.nodeChanged = false
}
}

func (c *Controller) onPeriodTask() {
// resend bootstrap message
c.sendMessages(c.bootstrapper.ResendBootstrapMessage())
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
time.Sleep(30 * time.Second)
})
} else {
failpoint.Inject("WaitBeforePass", nil)
failpoint.Inject("BlockBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
}
Expand Down
4 changes: 2 additions & 2 deletions logservice/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ func (c *logCoordinator) handleNodeChange(allNodes map[node.ID]*node.Info) {
for id := range c.nodes.m {
if _, ok := allNodes[id]; !ok {
delete(c.nodes.m, id)
log.Info("log coordinaotr detect node removed", zap.String("nodeId", id.String()))
log.Info("log coordinator detect node removed", zap.String("nodeId", id.String()))
}
}
for id, node := range allNodes {
if _, ok := c.nodes.m[id]; !ok {
c.nodes.m[id] = node
log.Info("log coordinaotr detect node added", zap.String("nodeId", id.String()))
log.Info("log coordinator detect node added", zap.String("nodeId", id.String()))
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions tests/integration_tests/fail_over_ddl_I/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# diff Configuration.
check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/fail_over_ddl_I/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["fail_over_ddl_test2.*", "fail_over_ddl_test.*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
36 changes: 36 additions & 0 deletions tests/integration_tests/fail_over_ddl_I/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
create database `fail_over_ddl_test2`;
use `fail_over_ddl_test2`;

create table t1 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

create table t2 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

INSERT INTO t1 (val, col0) VALUES (1, 1);
INSERT INTO t1 (val, col0) VALUES (2, 2);
INSERT INTO t1 (val, col0) VALUES (3, 3);
INSERT INTO t1 (val, col0) VALUES (4, 4);
INSERT INTO t1 (val, col0) VALUES (5, 5);
INSERT INTO t1 (val, col0) VALUES (6, 6);
INSERT INTO t1 (val, col0) VALUES (7, 7);
INSERT INTO t1 (val, col0) VALUES (8, 8);
INSERT INTO t1 (val, col0) VALUES (9, 9);
INSERT INTO t1 (val, col0) VALUES (10, 10);

INSERT INTO t2 (val, col0) VALUES (1, 1);
INSERT INTO t2 (val, col0) VALUES (2, 2);
INSERT INTO t2 (val, col0) VALUES (3, 3);
INSERT INTO t2 (val, col0) VALUES (4, 4);
INSERT INTO t2 (val, col0) VALUES (5, 5);
INSERT INTO t2 (val, col0) VALUES (6, 6);
INSERT INTO t2 (val, col0) VALUES (7, 7);
INSERT INTO t2 (val, col0) VALUES (8, 8);
INSERT INTO t2 (val, col0) VALUES (9, 9);
INSERT INTO t2 (val, col0) VALUES (10, 10);
Loading