From d00c27f0e5e02e68365cecbdf208cbe9083ef6aa Mon Sep 17 00:00:00 2001 From: Changyong Um Date: Thu, 30 Oct 2025 00:58:30 +0900 Subject: [PATCH 1/2] feat: Add rollingSize option to file sink for size-based file rotation Signed-off-by: Changyong Um --- internal/io/file/file_writer.go | 7 +- internal/io/file/sink.go | 17 ++- internal/io/file/sink_test.go | 187 ++++++++++++++++++++++++++++++++ 3 files changed, 207 insertions(+), 4 deletions(-) diff --git a/internal/io/file/file_writer.go b/internal/io/file/file_writer.go index 42f58d8eb8..d852ae4abd 100644 --- a/internal/io/file/file_writer.go +++ b/internal/io/file/file_writer.go @@ -37,6 +37,7 @@ type fileWriter struct { Hook writerHooks Start time.Time Count int + Size int64 Compress string fileBuffer *writer.BufioWrapWriter // Whether the file has written any data. It is only used to determine if new line is needed when writing data. @@ -87,10 +88,14 @@ func (m *fileSink) createFileWriter(ctx api.StreamContext, fn string, ft FileTyp if err != nil { return nil, err } - _, err = fws.Writer.Write(fws.Hook.Header()) + header := fws.Hook.Header() + _, err = fws.Writer.Write(header) if err != nil { return nil, err } + if m.c.RollingSize > 0 { + fws.Size = int64(len(header)) + } return fws, nil } diff --git a/internal/io/file/sink.go b/internal/io/file/sink.go index dbdf579726..39ddeffee3 100644 --- a/internal/io/file/sink.go +++ b/internal/io/file/sink.go @@ -40,6 +40,7 @@ type sinkConf struct { RollingNamePattern string `json:"rollingNamePattern"` // where to add the timestamp to the file name RollingHook string `json:"rollingHook"` RollingHookProps map[string]any `json:"rollingHookProps"` + RollingSize int64 `json:"rollingSize"` CheckInterval cast.DurationConf `json:"checkInterval"` Path string `json:"path"` // support dynamic property, when rolling, make sure the path is updated FileType FileType `json:"fileType"` @@ -80,8 +81,8 @@ func (m *fileSink) Provision(ctx api.StreamContext, props map[string]interface{} if c.CheckInterval < 0 { return fmt.Errorf("checkInterval must be positive") } - if c.RollingInterval == 0 && c.RollingCount == 0 { - return fmt.Errorf("one of rollingInterval and rollingCount must be set") + if c.RollingInterval == 0 && c.RollingCount == 0 && c.RollingSize == 0 { + return fmt.Errorf("one of rollingInterval, rollingCount, or rollingSize must be set") } if c.RollingInterval > 0 && c.RollingInterval < c.CheckInterval { c.CheckInterval = c.RollingInterval @@ -182,10 +183,14 @@ func (m *fileSink) Collect(ctx api.StreamContext, tuple api.RawTuple) error { m.mux.Lock() defer m.mux.Unlock() if fw.Written { - _, e := fw.Writer.Write(fw.Hook.Line()) + lineBytes := fw.Hook.Line() + _, e := fw.Writer.Write(lineBytes) if e != nil { return e } + if m.c.RollingSize > 0 { + fw.Size += int64(len(lineBytes)) + } } else { fw.Written = true } @@ -199,6 +204,12 @@ func (m *fileSink) Collect(ctx api.StreamContext, tuple api.RawTuple) error { return m.roll(ctx, fn, fw) } } + if m.c.RollingSize > 0 { + fw.Size += int64(len(item)) + if fw.Size >= m.c.RollingSize { + return m.roll(ctx, fn, fw) + } + } return nil } diff --git a/internal/io/file/sink_test.go b/internal/io/file/sink_test.go index a1c340802a..3e898ce216 100644 --- a/internal/io/file/sink_test.go +++ b/internal/io/file/sink_test.go @@ -198,6 +198,20 @@ func TestFileSink_Configure(t *testing.T) { "fields": []string{"c", "a", "b"}, }, }, + { // only set rolling size + name: "rollingSize", + c: &sinkConf{ + CheckInterval: cast.DurationConf(defaultCheckInterval), + Path: "cache", + FileType: LINES_TYPE, + RollingSize: 1024, + RollingCount: 0, + }, + p: map[string]interface{}{ + "rollingSize": 1024, + "rollingCount": 0, + }, + }, } ctx := mockContext.NewMockContext("test1", "test") for _, tt := range tests { @@ -781,3 +795,176 @@ func Decrypt(contents []byte) []byte { dstream.XORKeyStream(decrypted, secret) return decrypted } + +// Test size-based rolling +func TestFileSinkRollingSize_Collect(t *testing.T) { + // Remove existing files + err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if filepath.Ext(path) == ".log" { + fmt.Println("Deleting file:", path) + return os.Remove(path) + } + return nil + }) + if err != nil { + t.Fatal(err) + } + conf.IsTesting = true + + tests := []struct { + name string + ft FileType + fname string + rollingSize int64 + dataSize int + dataCount int + expectedFiles int + compress string + }{ + { + name: "lines_size_rolling", + ft: LINES_TYPE, + fname: "test_size_lines.log", + rollingSize: 100, // 100 bytes + dataSize: 33, // actual: {"index":N,"data":"test_value_N"} = 33 bytes + dataCount: 10, // 10 items: 33 + (1+33) + (1+33) = 101 bytes -> roll after 3 items + expectedFiles: 4, // Post-write check: 3+3+3+1 items = 4 files + }, + { + name: "json_size_rolling", + ft: JSON_TYPE, + fname: "test_size_json.log", + rollingSize: 100, // 100 bytes + dataSize: 33, // same data size + dataCount: 10, // JSON: "[" (1) + item (33) + "," (1) + item (33) + "," (1) + item (33) = 103 + expectedFiles: 4, // Post-write check: 3+3+3+1 items = 4 files + }, + { + name: "lines_size_rolling_gzip", + ft: LINES_TYPE, + fname: "test_size_lines_gzip.log", + rollingSize: 100, // 100 bytes (before compression) + dataSize: 33, + dataCount: 10, + expectedFiles: 4, // Same as lines without compression + compress: GZIP, + }, + { + name: "json_size_rolling_zstd", + ft: JSON_TYPE, + fname: "test_size_json_zstd.log", + rollingSize: 100, + dataSize: 33, + dataCount: 10, + expectedFiles: 4, // Same as JSON without compression + compress: ZSTD, + }, + } + + ctx := mockContext.NewMockContext("rule", "testRollingSize") + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sink := &fileSink{} + err := sink.Provision(ctx, map[string]interface{}{ + "path": tt.fname, + "fileType": tt.ft, + "rollingSize": tt.rollingSize, + "rollingCount": 0, + "rollingInterval": 0, + "rollingNamePattern": "suffix", + "compression": tt.compress, + }) + if err != nil { + t.Fatal(err) + } + + // Use mockclock to generate unique timestamps + mockclock.ResetClock(100) + err = sink.Connect(ctx, func(status string, message string) {}) + if err != nil { + t.Fatal(err) + } + c := mockclock.GetMockClock() + + // Collect data + for i := 0; i < tt.dataCount; i++ { + // Advance clock to ensure unique timestamps for each roll + c.Add(10 * time.Millisecond) + data := fmt.Sprintf("{\"index\":%d,\"data\":\"test_value_%d\"}", i, i) + if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: []byte(data)}); err != nil { + t.Errorf("unexpected error: %s", err) + } + } + + if err = sink.Close(ctx); err != nil { + t.Errorf("unexpected close error: %s", err) + } + + // Check if the expected number of files were created + files, err := filepath.Glob(fmt.Sprintf("test_size_%s*.log", tt.ft)) + if err != nil { + t.Fatal(err) + } + + if len(files) != tt.expectedFiles { + t.Errorf("expected %d files, but got %d files: %v", tt.expectedFiles, len(files), files) + } + + // Cleanup + for _, f := range files { + os.Remove(f) + } + }) + } +} + +// Test provision validation for rolling size +func TestFileSink_ProvisionRollingSize(t *testing.T) { + ctx := mockContext.NewMockContext("test1", "test") + + // Valid: only rollingSize set + m := &fileSink{} + err := m.Provision(ctx, map[string]interface{}{ + "path": "test.log", + "rollingSize": 1024, + }) + if err != nil { + t.Errorf("Provision with rollingSize should succeed, got error: %v", err) + } + if m.c.RollingSize != 1024 { + t.Errorf("Expected RollingSize 1024, got %d", m.c.RollingSize) + } + + // Invalid: no rolling condition set + m2 := &fileSink{} + err = m2.Provision(ctx, map[string]interface{}{ + "path": "test.log", + "rollingSize": 0, + "rollingCount": 0, + "rollingInterval": 0, + }) + if err == nil { + t.Error("Provision should fail when no rolling condition is set") + } + + // Valid: rollingSize + rollingCount + m3 := &fileSink{} + err = m3.Provision(ctx, map[string]interface{}{ + "path": "test.log", + "rollingSize": 2048, + "rollingCount": 100, + }) + if err != nil { + t.Errorf("Provision with multiple rolling conditions should succeed, got error: %v", err) + } + if m3.c.RollingSize != 2048 { + t.Errorf("Expected RollingSize 2048, got %d", m3.c.RollingSize) + } + if m3.c.RollingCount != 100 { + t.Errorf("Expected RollingCount 100, got %d", m3.c.RollingCount) + } +} From 8b5563da6c4bb98c14aff43b8a0df9ab77cc4431 Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Thu, 30 Oct 2025 11:49:00 +0800 Subject: [PATCH 2/2] fix(test): reset watch time Signed-off-by: Jiyong Huang --- internal/io/file/watch_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/io/file/watch_test.go b/internal/io/file/watch_test.go index bffd9c63a9..02b8b8f125 100644 --- a/internal/io/file/watch_test.go +++ b/internal/io/file/watch_test.go @@ -90,6 +90,7 @@ func TestWatchDir(t *testing.T) { r := &WatchWrapper{f: &Source{}} go func() { time.Sleep(100 * time.Millisecond) + timex.Set(654321) src, err := os.Open(filepath.Join(path, "test", "test.lines")) require.NoError(t, err) defer src.Close()