From 4a80a01795a9b34ff9eb25b03ffd7e1dc9701454 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 19 May 2026 12:26:44 +0800 Subject: [PATCH 1/2] cdc: mask sink uri secrets in errors --- cdc/api/v1/validator.go | 5 ++++- cdc/api/v2/api_helpers.go | 9 +++++++- cdc/api/v2/changefeed.go | 5 ++++- cdc/model/changefeed.go | 6 +++--- cdc/processor/processor.go | 10 +++++++-- cdc/sink/ddlsink/factory/factory.go | 5 ++++- cdc/sink/dmlsink/factory/factory.go | 5 ++++- cdc/sink/validator/validator.go | 7 +++++-- cdc/sink/validator/validator_test.go | 24 ++++++++++++++++++--- cdc/syncpointstore/syncpoint_store.go | 6 +++++- pkg/check/cluster.go | 9 ++++++-- pkg/check/cluster_test.go | 12 +++++++---- pkg/cmd/redo/apply.go | 5 ++++- pkg/config/sink.go | 5 ++++- pkg/sink/observer/observer.go | 6 +++++- pkg/util/uri.go | 30 ++++++++++++++++++++++----- pkg/util/uri_test.go | 21 +++++++++++++++++++ 17 files changed, 140 insertions(+), 30 deletions(-) diff --git a/cdc/api/v1/validator.go b/cdc/api/v1/validator.go index b0697093e8..b1e97b42b6 100644 --- a/cdc/api/v1/validator.go +++ b/cdc/api/v1/validator.go @@ -112,7 +112,10 @@ func verifyCreateChangefeedConfig( // verify replicaConfig sinkURIParsed, err := url.Parse(changefeedConfig.SinkURI) if err != nil { - return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return nil, cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(changefeedConfig.SinkURI)) } err = replicaConfig.ValidateAndAdjust(sinkURIParsed) if err != nil { diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 88935c6d2d..f424011bac 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -213,7 +213,10 @@ func (h APIV2HelpersImpl) verifyCreateChangefeedConfig( // verify replicaConfig sinkURIParsed, err := url.Parse(cfg.SinkURI) if err != nil { - return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return nil, cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(cfg.SinkURI)) } err = replicaCfg.ValidateAndAdjust(sinkURIParsed) if err != nil { @@ -343,6 +346,10 @@ func (h APIV2HelpersImpl) verifyUpdateChangefeedConfig( } sinkURIParsed, err := url.Parse(sinkURI) if err != nil { + err = cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(sinkURI)) return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err) } err = newInfo.Config.ValidateAndAdjust(sinkURIParsed) diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 0ebc4efe78..4aee9bae27 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -367,7 +367,10 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) { uri, err := url.Parse(cfg.SinkURI) if err != nil { - _ = c.Error(err) + _ = c.Error(cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(cfg.SinkURI))) return } scheme := uri.Scheme diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index be23f64c78..b9c4ef8cbe 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -348,7 +348,7 @@ func (info *ChangeFeedInfo) RmUnusedFields() { log.Warn( "failed to parse the sink uri", zap.Error(err), - zap.Any("sinkUri", info.SinkURI), + zap.Any("sinkURI", util.MaskSensitiveDataInURIForError(info.SinkURI)), ) return } @@ -488,7 +488,7 @@ func (info *ChangeFeedInfo) fixState() { func (info *ChangeFeedInfo) fixMySQLSinkProtocol() { uri, err := url.Parse(info.SinkURI) if err != nil { - log.Warn("parse sink URI failed", zap.Error(err)) + log.Warn("parse sink URI failed", zap.Error(util.MaskSensitiveDataInURLError(err))) // SAFETY: It is safe to ignore this unresolvable sink URI here, // as it is almost impossible for this to happen. // If we ignore it when fixing it after it happens, @@ -518,7 +518,7 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() { func (info *ChangeFeedInfo) fixMQSinkProtocol() { uri, err := url.Parse(info.SinkURI) if err != nil { - log.Warn("parse sink URI failed", zap.Error(err)) + log.Warn("parse sink URI failed", zap.Error(util.MaskSensitiveDataInURLError(err))) return } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index faa32e1348..fbf015e6b4 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -591,7 +591,10 @@ func (p *processor) tick(ctx context.Context) (error, error) { func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) { sinkURI, err := url.Parse(sinkURIStr) if err != nil { - return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return false, cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(sinkURIStr)) } scheme := sink.GetScheme(sinkURI) return sink.IsMySQLCompatibleScheme(scheme), nil @@ -609,7 +612,10 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) { func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) { sinkURI, err := url.Parse(sinkURIStr) if err != nil { - return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(sinkURIStr)) } scheme := sink.GetScheme(sinkURI) if !sink.IsMySQLCompatibleScheme(scheme) { diff --git a/cdc/sink/ddlsink/factory/factory.go b/cdc/sink/ddlsink/factory/factory.go index 31f7764639..a12e990ad8 100644 --- a/cdc/sink/ddlsink/factory/factory.go +++ b/cdc/sink/ddlsink/factory/factory.go @@ -43,7 +43,10 @@ func New( ) (ddlsink.Sink, error) { sinkURI, err := url.Parse(sinkURIStr) if err != nil { - return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return nil, cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(sinkURIStr)) } scheme := sink.GetScheme(sinkURI) switch scheme { diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index 057f4b21b1..a526b27ee4 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -75,7 +75,10 @@ func New( ) (*SinkFactory, error) { sinkURI, err := url.Parse(sinkURIStr) if err != nil { - return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return nil, cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(sinkURIStr)) } s := &SinkFactory{} diff --git a/cdc/sink/validator/validator.go b/cdc/sink/validator/validator.go index 161002e338..e816ca8658 100644 --- a/cdc/sink/validator/validator.go +++ b/cdc/sink/validator/validator.go @@ -74,7 +74,7 @@ func checkSyncPointSchemeCompatibility( return cerror.ErrSinkURIInvalid. GenWithStack( "sink uri scheme is not supported with syncpoint enabled"+ - "sink uri: %s", uri, + "sink uri: %s", util.MaskSensitiveDataInURIForError(uri.String()), ) } return nil @@ -90,7 +90,10 @@ func preCheckSinkURI(sinkURIStr string) (*url.URL, error) { sinkURI, err := url.Parse(sinkURIStr) if err != nil { - return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return nil, cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(sinkURIStr)) } // Check if we use the correct IPv6 address format. diff --git a/cdc/sink/validator/validator_test.go b/cdc/sink/validator/validator_test.go index ccfd8c125f..9276fe1da4 100644 --- a/cdc/sink/validator/validator_test.go +++ b/cdc/sink/validator/validator_test.go @@ -27,9 +27,10 @@ func TestPreCheckSinkURI(t *testing.T) { t.Parallel() tests := []struct { - name string - uri string - err string + name string + uri string + err string + notContains string }{ { name: "valid domain MySQL URI", @@ -76,6 +77,12 @@ func TestPreCheckSinkURI(t *testing.T) { uri: "kafka://3333:10:9:101::204:9092/topic1", err: "sink uri host is not valid IPv6 address", }, + { + name: "invalid escaped URI masks password", + uri: "mysql://root:verysecure@127.0.0.1/%zz", + err: `parse ""`, + notContains: "verysecure", + }, } for _, tt := range tests { @@ -85,6 +92,9 @@ func TestPreCheckSinkURI(t *testing.T) { _, err := preCheckSinkURI(test.uri) if test.err != "" { require.Contains(t, err.Error(), test.err) + if test.notContains != "" { + require.NotContains(t, err.Error(), test.notContains) + } } else { require.NoError(t, err) } @@ -126,4 +136,12 @@ func TestValidateSink(t *testing.T) { t, err.Error(), "sink uri scheme is not supported with syncpoint enabled", ) + require.NotContains(t, err.Error(), "verysecure") + require.NotContains(t, err.Error(), "sasl-password=verysecure") + + sinkURI = "kafka://127.0.0.1:9092/topic?sasl-password=verysecure" + err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil) + require.NotNil(t, err) + require.Contains(t, err.Error(), "sasl-password=xxxxx") + require.NotContains(t, err.Error(), "verysecure") } diff --git a/cdc/syncpointstore/syncpoint_store.go b/cdc/syncpointstore/syncpoint_store.go index d69493c8a7..7be317f114 100644 --- a/cdc/syncpointstore/syncpoint_store.go +++ b/cdc/syncpointstore/syncpoint_store.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/util" ) // SyncPointStore is an abstraction for anything that a changefeed may emit into. @@ -45,7 +46,10 @@ func NewSyncPointStore( // parse sinkURI as a URI sinkURI, err := url.Parse(sinkURIStr) if err != nil { - return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return nil, cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(sinkURIStr)) } switch strings.ToLower(sinkURI.Scheme) { case "mysql", "tidb", "mysql+ssl", "tidb+ssl": diff --git a/pkg/check/cluster.go b/pkg/check/cluster.go index b6704198f6..565a7675d9 100644 --- a/pkg/check/cluster.go +++ b/pkg/check/cluster.go @@ -24,6 +24,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" + "github.com/pingcap/tiflow/pkg/util" pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -48,7 +49,8 @@ func UpstreamDownstreamNotSame(ctx context.Context, zap.Uint64("upID", upID), zap.Uint64("downID", downID), zap.Bool("isTiDB", isTiDB)) if err != nil { log.Error("failed to get cluster ID from sink URI", - zap.String("downSinkURI", downSinkURI), zap.Error(err)) + zap.String("downSinkURI", util.MaskSensitiveDataInURIForError(downSinkURI)), + zap.Error(err)) return false, cerror.Trace(err) } @@ -70,7 +72,10 @@ func getClusterIDBySinkURI( // Create a MySQL connection by using the sink URI. url, err := url.Parse(sinkURI) if err != nil { - return 0, true, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return 0, true, cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(sinkURI)) } if !sink.IsMySQLCompatibleScheme(sink.GetScheme(url)) { return 0, false, nil diff --git a/pkg/check/cluster_test.go b/pkg/check/cluster_test.go index 8f31c8c2e8..91958f86fe 100644 --- a/pkg/check/cluster_test.go +++ b/pkg/check/cluster_test.go @@ -22,7 +22,6 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" @@ -58,6 +57,7 @@ func TestGetClusterIDBySinkURI(t *testing.T) { wantClusterID uint64 wantIsTiDB bool wantErr error + wantNotInErr string }{ { name: "non mysql scheme", @@ -65,9 +65,10 @@ func TestGetClusterIDBySinkURI(t *testing.T) { wantIsTiDB: false, }, { - name: "invalid uri", - sinkURI: ":invalid:", - wantErr: cerror.ErrSinkURIInvalid.Wrap(errors.New("parse \":invalid:\": missing protocol scheme")), + name: "invalid uri", + sinkURI: "mysql://user:verysecure@127.0.0.1/%zz", + wantErr: errors.New(`parse "": invalid URL escape "%zz"`), + wantNotInErr: "verysecure", }, { name: "connect error", @@ -136,6 +137,9 @@ func TestGetClusterIDBySinkURI(t *testing.T) { if tc.wantErr != nil { require.Error(t, err) require.Contains(t, err.Error(), tc.wantErr.Error()) + if tc.wantNotInErr != "" { + require.NotContains(t, err.Error(), tc.wantNotInErr) + } } else { require.NoError(t, err) require.Equal(t, tc.wantClusterID, clusterID) diff --git a/pkg/cmd/redo/apply.go b/pkg/cmd/redo/apply.go index e64213c17d..bbb29fef66 100644 --- a/pkg/cmd/redo/apply.go +++ b/pkg/cmd/redo/apply.go @@ -57,7 +57,10 @@ func (o *applyRedoOptions) complete(cmd *cobra.Command) error { // parse sinkURI as a URI sinkURI, err := url.Parse(o.sinkURI) if err != nil { - return cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(o.sinkURI)) } rawQuery := sinkURI.Query() // set safe-mode to true if not set diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 9b2f77b601..489a5f004d 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -943,7 +943,10 @@ func (s *SinkConfig) CheckCompatibilityWithSinkURI( ) error { sinkURI, err := url.Parse(sinkURIStr) if err != nil { - return cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(sinkURIStr)) } cfgParamsChanged := s.Protocol != oldSinkConfig.Protocol || diff --git a/pkg/sink/observer/observer.go b/pkg/sink/observer/observer.go index da79048271..251d9406d0 100644 --- a/pkg/sink/observer/observer.go +++ b/pkg/sink/observer/observer.go @@ -25,6 +25,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" + "github.com/pingcap/tiflow/pkg/util" ) // Observer defines an interface of downstream performance observer. @@ -67,7 +68,10 @@ func NewObserver( sinkURI, err := url.Parse(sinkURIStr) if err != nil { - return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return nil, cerror.WrapError( + cerror.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(sinkURIStr)) } scheme := strings.ToLower(sinkURI.Scheme) diff --git a/pkg/util/uri.go b/pkg/util/uri.go index 18bdb01bd5..c67badd5f5 100644 --- a/pkg/util/uri.go +++ b/pkg/util/uri.go @@ -17,9 +17,6 @@ import ( "net" "net/url" "strings" - - "github.com/pingcap/log" - "go.uber.org/zap" ) // IsValidIPv6AddressFormatInURI reports whether hostPort is a valid IPv6 address in URI. @@ -70,7 +67,6 @@ func validOptionalPort(port string) bool { func MaskSinkURI(uri string) (string, error) { uriParsed, err := url.Parse(uri) if err != nil { - log.Error("failed to parse sink URI", zap.Error(err)) return "", err } queries := uriParsed.Query() @@ -99,7 +95,6 @@ var sensitiveQueryParameterNames = []string{ func MaskSensitiveDataInURI(uri string) string { uriParsed, err := url.Parse(uri) if err != nil { - log.Error("failed to parse sink URI", zap.Error(err)) return "" } queries := uriParsed.Query() @@ -114,3 +109,28 @@ func MaskSensitiveDataInURI(uri string) string { uriParsed.RawQuery = queries.Encode() return uriParsed.Redacted() } + +// MaskSensitiveDataInURIForError masks sensitive data in a URI for error messages. +func MaskSensitiveDataInURIForError(uri string) string { + maskedURI := MaskSensitiveDataInURI(uri) + if maskedURI == "" && uri != "" { + return "" + } + return maskedURI +} + +// MaskSensitiveDataInURLError masks the URL carried by net/url errors. +func MaskSensitiveDataInURLError(err error) error { + if err == nil { + return nil + } + urlErr, ok := err.(*url.Error) + if !ok { + return err + } + return &url.Error{ + Op: urlErr.Op, + URL: MaskSensitiveDataInURIForError(urlErr.URL), + Err: urlErr.Err, + } +} diff --git a/pkg/util/uri_test.go b/pkg/util/uri_test.go index 788fb38c59..006537e457 100644 --- a/pkg/util/uri_test.go +++ b/pkg/util/uri_test.go @@ -14,6 +14,7 @@ package util import ( + "net/url" "testing" "github.com/stretchr/testify/require" @@ -134,3 +135,23 @@ func TestMaskSensitiveDataInURI(t *testing.T) { require.Equal(t, tt.masked, maskedURI) } } + +func TestMaskSensitiveDataInURIForError(t *testing.T) { + require.Equal(t, "", MaskSensitiveDataInURIForError("")) + require.Equal(t, "abc", MaskSensitiveDataInURIForError("abc")) + require.Equal(t, + "mysql://root:xxxxx@127.0.0.1:3306/?sasl-password=xxxxx", + MaskSensitiveDataInURIForError("mysql://root:verysecure@127.0.0.1:3306/?sasl-password=rawsecret")) + require.Equal(t, "", MaskSensitiveDataInURIForError("mysql://root:verysecure@127.0.0.1/%zz")) +} + +func TestMaskSensitiveDataInURLError(t *testing.T) { + rawURL := "mysql://root:verysecure@127.0.0.1/%zz" + _, err := url.Parse(rawURL) + require.Error(t, err) + + maskedErr := MaskSensitiveDataInURLError(err) + require.NotContains(t, maskedErr.Error(), "verysecure") + require.Contains(t, maskedErr.Error(), `parse ""`) + require.Contains(t, maskedErr.Error(), "invalid URL escape") +} From 7b3e4523e710765246df8f433671a4858cee8258 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 19 May 2026 14:15:20 +0800 Subject: [PATCH 2/2] cdc: address gemini review comments --- cdc/model/changefeed.go | 2 +- cdc/sink/validator/validator.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index b9c4ef8cbe..3ecec4da4c 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -347,7 +347,7 @@ func (info *ChangeFeedInfo) RmUnusedFields() { if err != nil { log.Warn( "failed to parse the sink uri", - zap.Error(err), + zap.Error(util.MaskSensitiveDataInURLError(err)), zap.Any("sinkURI", util.MaskSensitiveDataInURIForError(info.SinkURI)), ) return diff --git a/cdc/sink/validator/validator.go b/cdc/sink/validator/validator.go index e816ca8658..5a8c535a8f 100644 --- a/cdc/sink/validator/validator.go +++ b/cdc/sink/validator/validator.go @@ -74,7 +74,7 @@ func checkSyncPointSchemeCompatibility( return cerror.ErrSinkURIInvalid. GenWithStack( "sink uri scheme is not supported with syncpoint enabled"+ - "sink uri: %s", util.MaskSensitiveDataInURIForError(uri.String()), + " sink uri: %s", util.MaskSensitiveDataInURIForError(uri.String()), ) } return nil