test: add integration test about concurrent complex transactions#2472
Conversation
Summary of ChangesHello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust integration test designed to validate TiCDC's ability to handle concurrent and complex transactional workloads. The test simulates a diverse set of operations across e-commerce, banking, and social networking scenarios, ensuring data consistency and replication integrity under high-stress conditions. This enhancement is crucial for verifying the stability and correctness of TiCDC when dealing with real-world, multi-table, and multi-operation transactions. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
/pull-cdc-mysql-integration-heavy |
There was a problem hiding this comment.
Code Review
This pull request introduces a new integration test for concurrent complex transactions, which is a great addition for ensuring system stability under heavy, mixed workloads. The implementation is comprehensive, covering various transaction types across e-commerce, banking, and social networking domains. My review focuses on improving correctness, maintainability, and addressing a few potential race conditions and bugs in the test logic. Key feedback includes fixing a race condition in the workload generation loop, correcting transaction handling for insufficient stock scenarios, and improving the logic for complex mixed transactions. I've also included some suggestions for code simplification and style.
| affected, _ := result.RowsAffected() | ||
| if affected == 0 { | ||
| // Insufficient stock, rollback | ||
| return nil | ||
| } |
There was a problem hiding this comment.
When there is insufficient stock, the function returns nil. The deferred tx.Rollback() will execute, so the transaction is aborted. However, the caller runWorkload will interpret the nil return as a success and incorrectly increment the success and transaction counters. This is a correctness bug. The function should return an error to indicate failure.
| affected, _ := result.RowsAffected() | |
| if affected == 0 { | |
| // Insufficient stock, rollback | |
| return nil | |
| } | |
| affected, _ := result.RowsAffected() | |
| if affected == 0 { | |
| // Insufficient stock, rollback and report error. | |
| return errors.New("insufficient stock") | |
| } |
| for { | ||
| current := atomic.LoadInt64(&txnCounter) | ||
| if current >= *totalTxns { | ||
| break | ||
| } | ||
|
|
||
| // Select random transaction type | ||
| txType := selectTransactionType() | ||
|
|
||
| err := executor.ExecuteTransaction(ctx, txType) | ||
| if err != nil { | ||
| atomic.AddInt64(&failCount, 1) | ||
| log.Warn("Transaction failed", | ||
| zap.Int("worker", workerID), | ||
| zap.String("type", txType), | ||
| zap.Error(err)) | ||
| // Continue on error, don't fail the whole test | ||
| continue | ||
| } | ||
|
|
||
| atomic.AddInt64(&successCount, 1) | ||
| count := atomic.AddInt64(&txnCounter, 1) | ||
|
|
||
| // Periodic logging | ||
| if count%1000 == 0 { | ||
| log.Info("Progress", | ||
| zap.Int64("completed", count), | ||
| zap.Int64("total", *totalTxns), | ||
| zap.Int64("success", atomic.LoadInt64(&successCount)), | ||
| zap.Int64("failed", atomic.LoadInt64(&failCount))) | ||
| } | ||
| } |
There was a problem hiding this comment.
The loop termination logic has a race condition. Multiple workers can read txnCounter when its value is close to *totalTxns and all proceed, causing the total number of transactions to exceed the limit by up to concurrency - 1.
Additionally, txnCounter is only incremented on success. The flag description "total number of transactions to execute" is ambiguous, but typically implies the number of attempts. The current logic ties it to successful transactions.
A better approach is to atomically increment a counter at the start of each loop iteration to claim a "ticket". This resolves the race and clarifies that total-txns refers to attempted transactions.
for {
count := atomic.AddInt64(&txnCounter, 1)
if count > *totalTxns {
// We've gone over the limit, so we shouldn't execute.
// Decrement to correct the final count.
atomic.AddInt64(&txnCounter, -1)
break
}
// Select random transaction type
txType := selectTransactionType()
err := executor.ExecuteTransaction(ctx, txType)
if err != nil {
atomic.AddInt64(&failCount, 1)
log.Warn("Transaction failed",
zap.Int("worker", workerID),
zap.String("type", txType),
zap.Error(err))
// Continue on error, don't fail the whole test
continue
}
atomic.AddInt64(&successCount, 1)
// Periodic logging
if count%1000 == 0 {
log.Info("Progress",
zap.Int64("completed", count),
zap.Int64("total", *totalTxns),
zap.Int64("success", atomic.LoadInt64(&successCount)),
zap.Int64("failed", atomic.LoadInt64(&failCount)))
}
}| case 1: // Update account | ||
| accountID := randomUserID() | ||
| delta := float64(rand.Intn(200)-100) * 10.0 | ||
| if delta > 0 { | ||
| _, err = tx.ExecContext(ctx, | ||
| `UPDATE accounts SET balance = balance + ?, total_in = total_in + ?, version = version + 1 WHERE account_id = ?`, | ||
| delta, delta, accountID) | ||
| } |
There was a problem hiding this comment.
There are two issues in this part of the complexMixedTxn logic:
- When
deltais negative (a withdrawal), the operation is skipped. This makes the "complex mixed" transaction less realistic as it only handles deposits. - The
UPDATEstatement for a positivedeltais missing theupdated_at = ?field, which is inconsistent with other update operations in this test suite.
This should be fixed to correctly handle withdrawals and maintain data consistency.
| case 1: // Update account | |
| accountID := randomUserID() | |
| delta := float64(rand.Intn(200)-100) * 10.0 | |
| if delta > 0 { | |
| _, err = tx.ExecContext(ctx, | |
| `UPDATE accounts SET balance = balance + ?, total_in = total_in + ?, version = version + 1 WHERE account_id = ?`, | |
| delta, delta, accountID) | |
| } | |
| case 1: // Update account | |
| accountID := randomUserID() | |
| delta := float64(rand.Intn(200)-100) * 10.0 | |
| if delta > 0 { | |
| _, err = tx.ExecContext(ctx, | |
| `UPDATE accounts SET balance = balance + ?, total_in = total_in + ?, version = version + 1, updated_at = ? WHERE account_id = ?`, | |
| delta, delta, now, accountID) | |
| } else if delta < 0 { | |
| _, err = tx.ExecContext(ctx, | |
| `UPDATE accounts SET balance = balance + ?, total_out = total_out - ?, version = version + 1, updated_at = ? WHERE account_id = ? AND balance >= ?`, | |
| delta, delta, now, accountID, -delta) | |
| } |
| log.Fatal("DSN must be provided") | ||
| } | ||
|
|
||
| rand.Seed(time.Now().UnixNano()) |
| ctx, cancel := context.WithTimeout(ctx, 5*time.Second) | ||
| defer cancel() | ||
|
|
||
| if err = db.PingContext(ctx); err != nil { | ||
| log.Fatal("Failed to ping database", zap.String("dsn", dsn), zap.Error(err)) |
There was a problem hiding this comment.
The variables ctx and cancel are shadowed here. While this is a common pattern, it can make code harder to read and debug, especially when dealing with multiple nested contexts. Consider using different names for the inner context and cancel function to avoid shadowing.
pingCtx, pingCancel := context.WithTimeout(ctx, 5*time.Second)
defer pingCancel()
if err = db.PingContext(pingCtx); err != nil {
log.Fatal("Failed to ping database", zap.String("dsn", dsn), zap.Error(err))| } | ||
| items = append(items, item) | ||
| } | ||
| rows.Close() |
| return contains(errMsg, "Deadlock") || | ||
| contains(errMsg, "Lock wait timeout") || | ||
| contains(errMsg, "try again later") |
There was a problem hiding this comment.
This can be simplified by using strings.Contains from the standard library. After this change, the now-unused contains and findSubstring functions (lines 758-771) can be removed.
| return contains(errMsg, "Deadlock") || | |
| contains(errMsg, "Lock wait timeout") || | |
| contains(errMsg, "try again later") | |
| return strings.Contains(errMsg, "Deadlock") || | |
| strings.Contains(errMsg, "Lock wait timeout") || | |
| strings.Contains(errMsg, "try again later") |
|
/pull-cdc-mysql-integration-heavy |
|
/test pull-cdc-mysql-integration-heavy |
[LGTM Timeline notifier]Timeline:
|
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: flowbehappy, lidezhu The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/retest |
What problem does this PR solve?
Issue Number: ref #442
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note