Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ concurrency:
cancel-in-progress: true

jobs:
basic_e2e_test:
# To boost the test speed, we split every 10 test cases into a group.
e2e_test_group_1:
runs-on: ubuntu-latest
name: E2E Test
steps:
Expand Down Expand Up @@ -71,11 +72,21 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=changefeed_reconstruct

- name: Test common_1
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=common_1

- name: Test foreign_key
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=foreign_key

- name: Upload test logs
if: always()
uses: ./.github/actions/upload-test-logs
with:
log-name: basic_e2e_group1
log-name: e2e_test_group_1


failover_e2e_test1:
Expand Down
2 changes: 1 addition & 1 deletion logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error {
}

// Note: need write ddl event to disk before update ddl history,
// becuase other goroutines may read ddl events from disk according to ddl history
// because other goroutines may read ddl events from disk according to ddl history
writePersistedDDLEvent(p.db, &ddlEvent)

p.mu.Lock()
Expand Down
6 changes: 4 additions & 2 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ type persistStorageDDLHandler struct {
updateDDLHistoryFunc func(args updateDDLHistoryFuncArgs) []uint64
// updateSchemaMetadataFunc update database info, table info and partition info according to the ddl event
updateSchemaMetadataFunc func(args updateSchemaMetadataFuncArgs)
// iteratehEventTablesFunc call `apply` for all tables related to the ddl event
// iterateEventTablesFunc iterates through all physical table IDs affected by the DDL event
// and calls the provided `apply` function with those IDs. For partition tables, it includes
// all partition IDs.
iterateEventTablesFunc func(event *PersistedDDLEvent, apply func(tableIDs ...int64))
// extractTableInfoFunc extract (table info, deleted) for the specified `tableID` from ddl event
extractTableInfoFunc func(event *PersistedDDLEvent, tableID int64) (*common.TableInfo, bool)
// buildDDLEvent build a DDLEvent from a PersistedDDLEvent
buildDDLEventFunc func(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commonEvent.DDLEvent
buildDDLEventFunc func(event *PersistedDDLEvent, tableFilter filter.Filter) commonEvent.DDLEvent
}

var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{
Expand Down
16 changes: 8 additions & 8 deletions logservice/schemastore/schema_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ type schemaStore struct {

notifyCh chan interface{}

// resolved ts pending for apply
// pendingResolvedTs is the largest resolvedTs the pending ddl events
pendingResolvedTs atomic.Uint64

// max resolvedTs of all applied ddl events
// resolvedTs is the largest resolvedTs of all applied ddl events
// Invariant: resolvedTs >= pendingResolvedTs
resolvedTs atomic.Uint64

// the following two fields are used to filter out duplicate ddl events
Expand Down Expand Up @@ -107,7 +107,7 @@ func New(
kvStorage,
upperBound.ResolvedTs,
s.writeDDLEvent,
s.advanceResolvedTs)
s.advancePendingResolvedTs)
return s
}

Expand Down Expand Up @@ -138,9 +138,6 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error {
resolvedPhyTs := oracle.ExtractPhysical(pendingTs)
resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3
metrics.SchemaStoreResolvedTsLagGauge.Set(float64(resolvedLag))
// log.Info("advance resolved ts",
// zap.Uint64("resolveTs", pendingTs),
// zap.Float64("lag(s)", resolvedLag))
}()

if pendingTs <= s.resolvedTs.Load() {
Expand Down Expand Up @@ -308,7 +305,9 @@ func (s *schemaStore) writeDDLEvent(ddlEvent DDLJobWithCommitTs) {
}
}

func (s *schemaStore) advanceResolvedTs(resolvedTs uint64) {
// advancePendingResolvedTs will be call by ddlJobFetcher when it fetched a new ddl event
// it will update the pendingResolvedTs and notify the updateResolvedTs goroutine to apply the ddl event
func (s *schemaStore) advancePendingResolvedTs(resolvedTs uint64) {
for {
currentTs := s.pendingResolvedTs.Load()
if resolvedTs <= currentTs {
Expand All @@ -325,6 +324,7 @@ func (s *schemaStore) advanceResolvedTs(resolvedTs uint64) {
}

// TODO: use notify instead of sleep
// waitResolvedTs will wait until the schemaStore resolved ts is greater than or equal to ts.
func (s *schemaStore) waitResolvedTs(tableID int64, ts uint64, logInterval time.Duration) {
start := time.Now()
lastLogTime := time.Now()
Expand Down
29 changes: 0 additions & 29 deletions tests/integration_tests/clustered_index/conf/diff_config.toml

This file was deleted.

182 changes: 0 additions & 182 deletions tests/integration_tests/clustered_index/data/test.sql

This file was deleted.

64 changes: 0 additions & 64 deletions tests/integration_tests/clustered_index/run.sh

This file was deleted.

Loading