Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion internal/io/file/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type fileWriter struct {
Hook writerHooks
Start time.Time
Count int
Size int64
Compress string
fileBuffer *writer.BufioWrapWriter
// Whether the file has written any data. It is only used to determine if new line is needed when writing data.
Expand Down Expand Up @@ -87,10 +88,14 @@ func (m *fileSink) createFileWriter(ctx api.StreamContext, fn string, ft FileTyp
if err != nil {
return nil, err
}
_, err = fws.Writer.Write(fws.Hook.Header())
header := fws.Hook.Header()
_, err = fws.Writer.Write(header)
if err != nil {
return nil, err
}
if m.c.RollingSize > 0 {
fws.Size = int64(len(header))
}
return fws, nil
}

Expand Down
17 changes: 14 additions & 3 deletions internal/io/file/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type sinkConf struct {
RollingNamePattern string `json:"rollingNamePattern"` // where to add the timestamp to the file name
RollingHook string `json:"rollingHook"`
RollingHookProps map[string]any `json:"rollingHookProps"`
RollingSize int64 `json:"rollingSize"`
CheckInterval cast.DurationConf `json:"checkInterval"`
Path string `json:"path"` // support dynamic property, when rolling, make sure the path is updated
FileType FileType `json:"fileType"`
Expand Down Expand Up @@ -80,8 +81,8 @@ func (m *fileSink) Provision(ctx api.StreamContext, props map[string]interface{}
if c.CheckInterval < 0 {
return fmt.Errorf("checkInterval must be positive")
}
if c.RollingInterval == 0 && c.RollingCount == 0 {
return fmt.Errorf("one of rollingInterval and rollingCount must be set")
if c.RollingInterval == 0 && c.RollingCount == 0 && c.RollingSize == 0 {
return fmt.Errorf("one of rollingInterval, rollingCount, or rollingSize must be set")
}
if c.RollingInterval > 0 && c.RollingInterval < c.CheckInterval {
c.CheckInterval = c.RollingInterval
Expand Down Expand Up @@ -182,10 +183,14 @@ func (m *fileSink) Collect(ctx api.StreamContext, tuple api.RawTuple) error {
m.mux.Lock()
defer m.mux.Unlock()
if fw.Written {
_, e := fw.Writer.Write(fw.Hook.Line())
lineBytes := fw.Hook.Line()
_, e := fw.Writer.Write(lineBytes)
if e != nil {
return e
}
if m.c.RollingSize > 0 {
fw.Size += int64(len(lineBytes))
}
} else {
fw.Written = true
}
Expand All @@ -199,6 +204,12 @@ func (m *fileSink) Collect(ctx api.StreamContext, tuple api.RawTuple) error {
return m.roll(ctx, fn, fw)
}
}
if m.c.RollingSize > 0 {
fw.Size += int64(len(item))
if fw.Size >= m.c.RollingSize {
return m.roll(ctx, fn, fw)
}
}
return nil
}

Expand Down
187 changes: 187 additions & 0 deletions internal/io/file/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,20 @@ func TestFileSink_Configure(t *testing.T) {
"fields": []string{"c", "a", "b"},
},
},
{ // only set rolling size
name: "rollingSize",
c: &sinkConf{
CheckInterval: cast.DurationConf(defaultCheckInterval),
Path: "cache",
FileType: LINES_TYPE,
RollingSize: 1024,
RollingCount: 0,
},
p: map[string]interface{}{
"rollingSize": 1024,
"rollingCount": 0,
},
},
}
ctx := mockContext.NewMockContext("test1", "test")
for _, tt := range tests {
Expand Down Expand Up @@ -781,3 +795,176 @@ func Decrypt(contents []byte) []byte {
dstream.XORKeyStream(decrypted, secret)
return decrypted
}

// Test size-based rolling
func TestFileSinkRollingSize_Collect(t *testing.T) {
// Remove existing files
err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if filepath.Ext(path) == ".log" {
fmt.Println("Deleting file:", path)
return os.Remove(path)
}
return nil
})
if err != nil {
t.Fatal(err)
}
conf.IsTesting = true

tests := []struct {
name string
ft FileType
fname string
rollingSize int64
dataSize int
dataCount int
expectedFiles int
compress string
}{
{
name: "lines_size_rolling",
ft: LINES_TYPE,
fname: "test_size_lines.log",
rollingSize: 100, // 100 bytes
dataSize: 33, // actual: {"index":N,"data":"test_value_N"} = 33 bytes
dataCount: 10, // 10 items: 33 + (1+33) + (1+33) = 101 bytes -> roll after 3 items
expectedFiles: 4, // Post-write check: 3+3+3+1 items = 4 files
},
{
name: "json_size_rolling",
ft: JSON_TYPE,
fname: "test_size_json.log",
rollingSize: 100, // 100 bytes
dataSize: 33, // same data size
dataCount: 10, // JSON: "[" (1) + item (33) + "," (1) + item (33) + "," (1) + item (33) = 103
expectedFiles: 4, // Post-write check: 3+3+3+1 items = 4 files
},
{
name: "lines_size_rolling_gzip",
ft: LINES_TYPE,
fname: "test_size_lines_gzip.log",
rollingSize: 100, // 100 bytes (before compression)
dataSize: 33,
dataCount: 10,
expectedFiles: 4, // Same as lines without compression
compress: GZIP,
},
{
name: "json_size_rolling_zstd",
ft: JSON_TYPE,
fname: "test_size_json_zstd.log",
rollingSize: 100,
dataSize: 33,
dataCount: 10,
expectedFiles: 4, // Same as JSON without compression
compress: ZSTD,
},
}

ctx := mockContext.NewMockContext("rule", "testRollingSize")

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink := &fileSink{}
err := sink.Provision(ctx, map[string]interface{}{
"path": tt.fname,
"fileType": tt.ft,
"rollingSize": tt.rollingSize,
"rollingCount": 0,
"rollingInterval": 0,
"rollingNamePattern": "suffix",
"compression": tt.compress,
})
if err != nil {
t.Fatal(err)
}

// Use mockclock to generate unique timestamps
mockclock.ResetClock(100)
err = sink.Connect(ctx, func(status string, message string) {})
if err != nil {
t.Fatal(err)
}
c := mockclock.GetMockClock()

// Collect data
for i := 0; i < tt.dataCount; i++ {
// Advance clock to ensure unique timestamps for each roll
c.Add(10 * time.Millisecond)
data := fmt.Sprintf("{\"index\":%d,\"data\":\"test_value_%d\"}", i, i)
if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: []byte(data)}); err != nil {
t.Errorf("unexpected error: %s", err)
}
}

if err = sink.Close(ctx); err != nil {
t.Errorf("unexpected close error: %s", err)
}

// Check if the expected number of files were created
files, err := filepath.Glob(fmt.Sprintf("test_size_%s*.log", tt.ft))
if err != nil {
t.Fatal(err)
}

if len(files) != tt.expectedFiles {
t.Errorf("expected %d files, but got %d files: %v", tt.expectedFiles, len(files), files)
}

// Cleanup
for _, f := range files {
os.Remove(f)
}
})
}
}

// Test provision validation for rolling size
func TestFileSink_ProvisionRollingSize(t *testing.T) {
ctx := mockContext.NewMockContext("test1", "test")

// Valid: only rollingSize set
m := &fileSink{}
err := m.Provision(ctx, map[string]interface{}{
"path": "test.log",
"rollingSize": 1024,
})
if err != nil {
t.Errorf("Provision with rollingSize should succeed, got error: %v", err)
}
if m.c.RollingSize != 1024 {
t.Errorf("Expected RollingSize 1024, got %d", m.c.RollingSize)
}

// Invalid: no rolling condition set
m2 := &fileSink{}
err = m2.Provision(ctx, map[string]interface{}{
"path": "test.log",
"rollingSize": 0,
"rollingCount": 0,
"rollingInterval": 0,
})
if err == nil {
t.Error("Provision should fail when no rolling condition is set")
}

// Valid: rollingSize + rollingCount
m3 := &fileSink{}
err = m3.Provision(ctx, map[string]interface{}{
"path": "test.log",
"rollingSize": 2048,
"rollingCount": 100,
})
if err != nil {
t.Errorf("Provision with multiple rolling conditions should succeed, got error: %v", err)
}
if m3.c.RollingSize != 2048 {
t.Errorf("Expected RollingSize 2048, got %d", m3.c.RollingSize)
}
if m3.c.RollingCount != 100 {
t.Errorf("Expected RollingCount 100, got %d", m3.c.RollingCount)
}
}
1 change: 1 addition & 0 deletions internal/io/file/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func TestWatchDir(t *testing.T) {
r := &WatchWrapper{f: &Source{}}
go func() {
time.Sleep(100 * time.Millisecond)
timex.Set(654321)
src, err := os.Open(filepath.Join(path, "test", "test.lines"))
require.NoError(t, err)
defer src.Close()
Expand Down
Loading