diff --git a/src/Backend/opti-sql-go/operators/filter/limit.go b/src/Backend/opti-sql-go/operators/filter/limit.go index 6161c1b..57b77d6 100644 --- a/src/Backend/opti-sql-go/operators/filter/limit.go +++ b/src/Backend/opti-sql-go/operators/filter/limit.go @@ -1,14 +1,24 @@ package filter import ( + "context" + "errors" + "fmt" "io" + "math" + "opti-sql-go/Expr" "opti-sql-go/operators" + "strings" "github.com/apache/arrow/go/v17/arrow" + "github.com/apache/arrow/go/v17/arrow/array" + "github.com/apache/arrow/go/v17/arrow/compute" + "github.com/apache/arrow/go/v17/arrow/memory" ) var ( _ = (operators.Operator)(&LimitExec{}) + _ = (operators.Operator)(&DistinctExec{}) ) type LimitExec struct { @@ -70,12 +80,177 @@ func (l *LimitExec) Close() error { return l.input.Close() } -/* -type Distinct struct { - child operators.Operator - schema *arrow.Schema - colExpr Expr.Expression // resolves to column that we want distinct values of - seenValues map[string]struct{} // arrow.Array.value(i) (stored and compared as string , structs occupie no space - done bool +type DistinctExec struct { + input operators.Operator + schema *arrow.Schema + colExpr []Expr.Expression // resolves to column that we want distinct values of + seenValues map[string]struct{} // arrow.Array.value(i) (stored and compared as string , structs occupie no space + distinctValuesArray []arrow.Array // hold arrays of distinct values + consumedOffset uint64 // where did we leave off at when returning the distinct arrays to the caller + consumedInput bool // did we consume all the input record batches? + totalRows uint64 + done bool +} + +func NewDistinctExec(input operators.Operator, colExpr []Expr.Expression) (*DistinctExec, error) { + if len(colExpr) == 0 { + return nil, errors.New("Distinct operator requires at least one column expression") + } + return &DistinctExec{ + input: input, + schema: input.Schema(), + colExpr: colExpr, + seenValues: make(map[string]struct{}), + distinctValuesArray: make([]arrow.Array, len(input.Schema().Fields())), + }, nil +} + +// pipeline breaker. consume all, if row combonation is already seen, dont include in output +func (d *DistinctExec) Next(n uint16) (*operators.RecordBatch, error) { + if d.done { + return nil, io.EOF + } + mem := memory.NewGoAllocator() + if !d.consumedInput { + for { + childBatch, err := d.input.Next(math.MaxUint16) + if err != nil { + if errors.Is(err, io.EOF) { + d.consumedInput = true + fmt.Printf("distinctArray: \t%v\n", d.distinctValuesArray) + if d.distinctValuesArray[0] != nil { // nill check in case of no distict elements being found or even just input operator doesnt return anything + d.totalRows = uint64(d.distinctValuesArray[0].Len()) + } + break + } + return nil, err + } + // resolve the columns we care about + evaluatedArrays := make([]arrow.Array, len(d.colExpr)) + for i := range d.colExpr { + arr, err := Expr.EvalExpression(d.colExpr[i], childBatch) + if err != nil { + return nil, err + } + evaluatedArrays[i] = arr + } + var idxTracker []int32 + // Now iterate through each row in the batch + numRows := int(childBatch.RowCount) + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + // Build a key from the combination of values in this row + var keyBuilder strings.Builder + for colIdx, arr := range evaluatedArrays { + if colIdx > 0 { + keyBuilder.WriteString("|") // separator between columns + } + // Check if value is null + if arr.IsNull(rowIdx) { + keyBuilder.WriteString("NULL") + } else { + keyBuilder.WriteString(arr.ValueStr(rowIdx)) + } + } + + key := keyBuilder.String() + if _, seen := d.seenValues[key]; !seen { + d.seenValues[key] = struct{}{} + idxTracker = append(idxTracker, int32(rowIdx)) + // check if its been seen, if it hasnt been add it to the table, + // and keep track of the index so we can grab the value from the array + } + } + takeArray := idxToArrowArray(idxTracker, mem) + for i := range len(childBatch.Columns) { + largeArray := childBatch.Columns[i] + uniqueElements, err := compute.TakeArray(context.TODO(), largeArray, takeArray) + if err != nil { + return nil, err + } + joinedArray, err := joinArrays(d.distinctValuesArray[i], uniqueElements, mem) + if err != nil { + return nil, err + } + // uniqueElements.Release() + d.distinctValuesArray[i] = joinedArray + } + } + } + var readsize uint64 + remaining := d.totalRows - d.consumedOffset + if remaining == 0 { // we've consumed all the distinct arrays, operator is finished + d.done = true + return nil, io.EOF + } + // If remaining >= n, read n. Otherwise read what's left. + if remaining >= uint64(n) { + readsize = uint64(n) + } else { + readsize = remaining + } + distinctArraySlice, err := d.consumeDistinctArrays(readsize, mem) + if err != nil { + return nil, err + } + + var rc uint64 + if len(distinctArraySlice) == 0 { + rc = 0 + } else { + rc = uint64(distinctArraySlice[0].Len()) + } + return &operators.RecordBatch{ + Schema: d.schema, + Columns: distinctArraySlice, + RowCount: rc, + }, nil +} +func (d *DistinctExec) Schema() *arrow.Schema { return d.schema } +func (d *DistinctExec) Close() error { + operators.ReleaseArrays(d.distinctValuesArray) + return d.input.Close() +} +func (d *DistinctExec) consumeDistinctArrays(readSize uint64, mem memory.Allocator) ([]arrow.Array, error) { + ctx := context.TODO() + resultColumns := make([]arrow.Array, len(d.schema.Fields())) + offsetArray := genoffsetTakeIdx(d.consumedOffset, readSize, mem) + defer offsetArray.Release() + for i := range d.distinctValuesArray { + col := d.distinctValuesArray[i] + slice, err := compute.TakeArray(ctx, col, offsetArray) + if err != nil { + return nil, err + } + resultColumns[i] = slice + } + d.consumedOffset += readSize + return resultColumns, nil +} + +func idxToArrowArray(v []int32, mem memory.Allocator) arrow.Array { + // turn to array first + b := array.NewInt32Builder(mem) + defer b.Release() + for _, val := range v { + b.Append(val) + } + arr := b.NewArray() + return arr +} +func joinArrays(a1, a2 arrow.Array, mem memory.Allocator) (arrow.Array, error) { + if a1 == nil || a1.Len() == 0 { + return a2, nil + } + if a2 == nil || a2.Len() == 0 { + return a1, nil + } + return array.Concatenate([]arrow.Array{a1, a2}, mem) +} +func genoffsetTakeIdx(offset, size uint64, mem memory.Allocator) arrow.Array { + b := array.NewUint64Builder(mem) + defer b.Release() + for i := range size { + b.Append(offset + i) + } + return b.NewArray() } -*/ diff --git a/src/Backend/opti-sql-go/operators/filter/limit_test.go b/src/Backend/opti-sql-go/operators/filter/limit_test.go index 64cd006..9516a89 100644 --- a/src/Backend/opti-sql-go/operators/filter/limit_test.go +++ b/src/Backend/opti-sql-go/operators/filter/limit_test.go @@ -9,6 +9,7 @@ import ( "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" + "github.com/apache/arrow/go/v17/arrow/memory" ) func generateTestColumns() ([]string, []any) { @@ -45,11 +46,44 @@ func generateTestColumns() ([]string, []any) { return names, columns } +func generateTestColumnsDistinct() ([]string, []any) { + names := []string{ + "city", + "state", + "product", + } + columns := []any{ + // city - lots of repeated values + []string{ + "Boston", "Boston", "New York", "Boston", "Chicago", + "New York", "Boston", "Chicago", "New York", "Boston", + "Chicago", "Boston", "New York", "Chicago", "Boston", + }, + // state - corresponds to cities + []string{ + "MA", "MA", "NY", "MA", "IL", + "NY", "MA", "IL", "NY", "MA", + "IL", "MA", "NY", "IL", "MA", + }, + // product - repeated products + []string{ + "Laptop", "Phone", "Laptop", "Mouse", "Laptop", + "Phone", "Laptop", "Phone", "Tablet", "Mouse", + "Laptop", "Phone", "Laptop", "Tablet", "Mouse", + }, + } + return names, columns +} func basicProject() *project.InMemorySource { names, col := generateTestColumns() v, _ := project.NewInMemoryProjectExec(names, col) return v } +func distinctProject() *project.InMemorySource { + names, col := generateTestColumnsDistinct() + v, _ := project.NewInMemoryProjectExec(names, col) + return v +} func maskAny(t *testing.T, src *project.InMemorySource, expr Expr.Expression, expected []bool) { t.Helper() @@ -536,3 +570,259 @@ func TestLikeEdgeCases(t *testing.T) { maskAny(t, src, expr, expected) }) } + +// Distinct test cases + +func TestDistinctInit(t *testing.T) { + t.Run("distinct no expressions", func(t *testing.T) { + + proj := distinctProject() + exprs := []Expr.Expression{} + _, err := NewDistinctExec(proj, exprs) + if err == nil { + t.Fatalf("expected error from passing in no expressions to distinct operator but got nil") + } + }) + t.Run("distinct init and interface check", func(t *testing.T) { + proj := distinctProject() + exprs := []Expr.Expression{ + Expr.NewColumnResolve("city"), + } + distinctExec, err := NewDistinctExec(proj, exprs) + if err != nil { + t.Fatalf("unexpected error creating new distinct operator") + } + s := distinctExec.Schema() + if !s.Equal(proj.Schema()) { + t.Fatalf("distinct schema should be the exact same as input but recieved %v instead of %v", s, proj.Schema()) + } + t.Logf("distinct operator %v\n", distinctExec) + if err := distinctExec.Close(); err != nil { + t.Fatalf("unexpected error occured closing operator %v\n", err) + } + distinctExec.done = true + _, err = distinctExec.Next(3) + if !errors.Is(err, io.EOF) { + t.Fatalf("expected io.EOF but got %v\n", err) + } + }) + t.Run("Basic Next operator test", func(t *testing.T) { + proj := distinctProject() + exprs := []Expr.Expression{ + Expr.NewColumnResolve("city"), + } + distinctExec, err := NewDistinctExec(proj, exprs) + if err != nil { + t.Fatalf("unexpected error creating new distinct operator") + } + rc, err := distinctExec.Next(5) + if err != nil { + t.Fatalf("error occured grabbing next values from distinct operator %v", err) + } + t.Logf("rc:\t%v\n", rc.PrettyPrint()) + + }) + t.Run("BasicNextOperatorWithMultipleDistinctColumns", func(t *testing.T) { + proj := distinctProject() + exprs := []Expr.Expression{ + Expr.NewColumnResolve("city"), + Expr.NewColumnResolve("state"), + } + distinctExec, err := NewDistinctExec(proj, exprs) + if err != nil { + t.Fatalf("unexpected error creating new distinct operator") + } + rc, err := distinctExec.Next(5) + if err != nil { + t.Fatalf("error occured grabbing next values from distinct operator %v", err) + } + t.Logf("rc:\t%v\n", rc.PrettyPrint()) + + }) +} +func TestDistinctNext(t *testing.T) { + t.Run("return limited columns", func(t *testing.T) { + proj := distinctProject() + exprs := []Expr.Expression{ + Expr.NewColumnResolve("city"), + Expr.NewColumnResolve("state"), + } + distinctExec, err := NewDistinctExec(proj, exprs) + if err != nil { + t.Fatalf("unexpected error creating new distinct operator") + } + batchsize := 1 + count := 0 + for { + rc, err := distinctExec.Next(uint16(batchsize)) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + t.Fatalf("error occured grabbing next values from distinct operator %v", err) + } + t.Logf("\t%v\n", rc.PrettyPrint()) + if rc.RowCount != uint64(batchsize) { + t.Fatalf("expected record batch of size %d but got %d", batchsize, rc.RowCount) + } + count += int(rc.RowCount) + } + // distinctProject has 3 distinct (city,state) combinations + if count != 3 { + t.Fatalf("expected total distinct rows 3, got %d", count) + } + }) + + t.Run("single column distinct returns expected order", func(t *testing.T) { + proj := distinctProject() + exprs := []Expr.Expression{ + Expr.NewColumnResolve("city"), + } + distinctExec, err := NewDistinctExec(proj, exprs) + if err != nil { + t.Fatalf("unexpected error creating new distinct operator") + } + // request all in one go + rc, err := distinctExec.Next(10) + if err != nil { + t.Fatalf("Next failed: %v", err) + } + if rc.RowCount != 3 { + t.Fatalf("expected 3 distinct cities, got %d", rc.RowCount) + } + // Expect first-seen order: Boston, New York, Chicago + cityArr := rc.Columns[0].(*array.String) + expect := []string{"Boston", "New York", "Chicago"} + for i := 0; i < int(rc.RowCount); i++ { + if cityArr.Value(i) != expect[i] { + t.Fatalf("expected city %s at idx %d, got %s", expect[i], i, cityArr.Value(i)) + } + } + for _, c := range rc.Columns { + c.Release() + } + }) + + t.Run("Next returns EOF after consumption and Close works", func(t *testing.T) { + proj := distinctProject() + exprs := []Expr.Expression{ + Expr.NewColumnResolve("city"), + Expr.NewColumnResolve("state"), + } + distinctExec, err := NewDistinctExec(proj, exprs) + if err != nil { + t.Fatalf("unexpected error creating new distinct operator") + } + // consume all + _, err = distinctExec.Next(10) + if err != nil && !errors.Is(err, io.EOF) { + t.Fatalf("unexpected error while consuming distinct results: %v", err) + // it's ok if we got results; call Next again until EOF + } + // subsequent Next should return EOF + _, err = distinctExec.Next(1) + if !errors.Is(err, io.EOF) { + t.Fatalf("expected EOF after consuming distinct results, got %v", err) + } + if err := distinctExec.Close(); err != nil { + t.Fatalf("unexpected error on Close: %v", err) + } + }) +} + +func TestJoinArrays(t *testing.T) { + mem := memory.NewGoAllocator() + + t.Run("first array nil or empty - returns second", func(t *testing.T) { + builder := array.NewInt32Builder(mem) + defer builder.Release() + builder.AppendValues([]int32{1, 2, 3}, nil) + a2 := builder.NewArray() + defer a2.Release() + + // Test with nil + result, err := joinArrays(nil, a2, mem) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Len() != 3 { + t.Fatalf("expected length 3, got %d", result.Len()) + } + + // Test with empty array + emptyBuilder := array.NewInt32Builder(mem) + defer emptyBuilder.Release() + a1Empty := emptyBuilder.NewArray() + defer a1Empty.Release() + + result, err = joinArrays(a1Empty, a2, mem) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Len() != 3 { + t.Fatalf("expected length 3, got %d", result.Len()) + } + }) + + t.Run("second array nil or empty - returns first", func(t *testing.T) { + builder := array.NewInt32Builder(mem) + defer builder.Release() + builder.AppendValues([]int32{4, 5, 6}, nil) + a1 := builder.NewArray() + defer a1.Release() + + // Test with nil + result, err := joinArrays(a1, nil, mem) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Len() != 3 { + t.Fatalf("expected length 3, got %d", result.Len()) + } + + // Test with empty array + emptyBuilder := array.NewInt32Builder(mem) + defer emptyBuilder.Release() + a2Empty := emptyBuilder.NewArray() + defer a2Empty.Release() + + result, err = joinArrays(a1, a2Empty, mem) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Len() != 3 { + t.Fatalf("expected length 3, got %d", result.Len()) + } + }) + + t.Run("both arrays have data - concatenates", func(t *testing.T) { + builder1 := array.NewInt32Builder(mem) + defer builder1.Release() + builder1.AppendValues([]int32{1, 2, 3}, nil) + a1 := builder1.NewArray() + defer a1.Release() + + builder2 := array.NewInt32Builder(mem) + defer builder2.Release() + builder2.AppendValues([]int32{4, 5, 6}, nil) + a2 := builder2.NewArray() + defer a2.Release() + + result, err := joinArrays(a1, a2, mem) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Len() != 6 { + t.Fatalf("expected length 6, got %d", result.Len()) + } + + // Verify concatenated values + int32Result := result.(*array.Int32) + expectedValues := []int32{1, 2, 3, 4, 5, 6} + for i := 0; i < int32Result.Len(); i++ { + if int32Result.Value(i) != expectedValues[i] { + t.Fatalf("at index %d: expected %d, got %d", i, expectedValues[i], int32Result.Value(i)) + } + } + }) +}