Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Initial development is done in **Go** (`opti-sql-go`), which serves as the prima
- `/operators` - SQL operator implementations (filter, join, aggregation, project)
- `/physical-optimizer` - Query plan parsing and optimization
- `/substrait` - Substrait plan integration
- `/operators/OPERATORS.md` - concise reference for operator constructors, behavior and examples

## Branching Model

Expand Down
20 changes: 9 additions & 11 deletions src/Backend/opti-sql-go/Expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,8 @@ func NewLiteralResolve(Type arrow.DataType, Value any) *LiteralResolve {
castVal = float64(v)
}
default:
fmt.Printf("%v did not match any case, of type %T\n", v, v)
castVal = Value
}
fmt.Printf("sotred as -> %v\t%v\n", Type, castVal)
return &LiteralResolve{Type: Type, Value: castVal}
}
func EvalLiteral(l *LiteralResolve, batch *operators.RecordBatch) (arrow.Array, error) {
Expand Down Expand Up @@ -448,37 +446,36 @@ func EvalBinary(b *BinaryExpr, batch *operators.RecordBatch) (arrow.Array, error
if err != nil {
return nil, err
}
ctx := context.Background()
opt := compute.ArithmeticOptions{}
switch b.Op {
// arithmetic
case Addition:
datum, err := compute.Add(context.TODO(), opt, compute.NewDatum(leftArr), compute.NewDatum(rightArr))
datum, err := compute.Add(ctx, opt, compute.NewDatum(leftArr), compute.NewDatum(rightArr))
if err != nil {
return nil, err
}
return unpackDatum(datum)
case Subtraction:
datum, err := compute.Subtract(context.TODO(), opt, compute.NewDatum(leftArr), compute.NewDatum(rightArr))
datum, err := compute.Subtract(ctx, opt, compute.NewDatum(leftArr), compute.NewDatum(rightArr))
if err != nil {
return nil, err
}
return unpackDatum(datum)

case Multiplication:
datum, err := compute.Multiply(context.TODO(), opt, compute.NewDatum(leftArr), compute.NewDatum(rightArr))
datum, err := compute.Multiply(ctx, opt, compute.NewDatum(leftArr), compute.NewDatum(rightArr))
if err != nil {
return nil, err
}
return unpackDatum(datum)
case Division:
datum, err := compute.Divide(context.TODO(), opt, compute.NewDatum(leftArr), compute.NewDatum(rightArr))
datum, err := compute.Divide(ctx, opt, compute.NewDatum(leftArr), compute.NewDatum(rightArr))
if err != nil {
return nil, err
}
return unpackDatum(datum)

// comparisions TODO:
// These return a boolean array
case Equal:
if leftArr.DataType() != rightArr.DataType() {
return nil, ErrCantCompareDifferentTypes(leftArr.DataType(), rightArr.DataType())
Expand Down Expand Up @@ -593,6 +590,7 @@ func NewScalarFunction(function supportedFunctions, Argument Expression) *Scalar
}

func EvalScalarFunction(s *ScalarFunction, batch *operators.RecordBatch) (arrow.Array, error) {
ctx := context.Background()
switch s.Function {
case Upper:
arr, err := EvalExpression(s.Arguments, batch)
Expand All @@ -612,7 +610,7 @@ func EvalScalarFunction(s *ScalarFunction, batch *operators.RecordBatch) (arrow.
if err != nil {
return nil, err
}
datum, err := compute.AbsoluteValue(context.TODO(), compute.ArithmeticOptions{}, compute.NewDatum(arr))
datum, err := compute.AbsoluteValue(ctx, compute.ArithmeticOptions{}, compute.NewDatum(arr))
if err != nil {
return nil, err
}
Expand All @@ -622,7 +620,7 @@ func EvalScalarFunction(s *ScalarFunction, batch *operators.RecordBatch) (arrow.
if err != nil {
return nil, err
}
datum, err := compute.Round(context.TODO(), compute.DefaultRoundOptions, compute.NewDatum(arr))
datum, err := compute.Round(ctx, compute.DefaultRoundOptions, compute.NewDatum(arr))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -657,7 +655,7 @@ func EvalCast(c *CastExpr, batch *operators.RecordBatch) (arrow.Array, error) {

// Use Arrow compute kernel to cast
castOpts := compute.SafeCastOptions(c.TargetType)
out, err := compute.CastArray(context.TODO(), arr, castOpts)
out, err := compute.CastArray(context.Background(), arr, castOpts)
if err != nil {
return nil, fmt.Errorf("cast error: cannot cast %s to %s: %w",
arr.DataType(), c.TargetType, err)
Expand Down
10 changes: 4 additions & 6 deletions src/Backend/opti-sql-go/Expr/expr_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package Expr

import (
"fmt"
"log"
"opti-sql-go/operators"
"testing"
Expand Down Expand Up @@ -1550,7 +1549,7 @@ func TestLikeOperatorSQL(t *testing.T) {
t.Run("name starts with a", func(t *testing.T) {
rc := generateTestColumns()
sqlStatment := "A%"
whereStatment := NewBinaryExpr(NewColumnResolve("name"), Like, NewLiteralResolve(arrow.BinaryTypes.String, string(sqlStatment)))
whereStatment := NewBinaryExpr(NewColumnResolve("name"), Like, NewLiteralResolve(arrow.BinaryTypes.String, sqlStatment))
boolMask, err := EvalExpression(whereStatment, rc)
if err != nil {
t.Fatalf("unexpected error from EvalExpression")
Expand All @@ -1572,7 +1571,7 @@ func TestLikeOperatorSQL(t *testing.T) {
t.Run("name contains li", func(t *testing.T) {
rc := generateTestColumns()
sqlStatment := "%li%"
whereStatment := NewBinaryExpr(NewColumnResolve("name"), Like, NewLiteralResolve(arrow.BinaryTypes.String, string(sqlStatment)))
whereStatment := NewBinaryExpr(NewColumnResolve("name"), Like, NewLiteralResolve(arrow.BinaryTypes.String, sqlStatment))

boolMask, err := EvalExpression(whereStatment, rc)
if err != nil {
Expand Down Expand Up @@ -1624,7 +1623,7 @@ func TestLikeOperatorSQL(t *testing.T) {
t.Run("name is exactly 5 letters", func(t *testing.T) {
rc := generateTestColumns()
sqlStatment := "_____"
whereStatment := NewBinaryExpr(NewColumnResolve("name"), Like, NewLiteralResolve(arrow.BinaryTypes.String, string(sqlStatment)))
whereStatment := NewBinaryExpr(NewColumnResolve("name"), Like, NewLiteralResolve(arrow.BinaryTypes.String, sqlStatment))

boolMask, err := EvalExpression(whereStatment, rc)
if err != nil {
Expand All @@ -1650,7 +1649,7 @@ func TestLikeOperatorSQL(t *testing.T) {
t.Run("name starts with Ch", func(t *testing.T) {
rc := generateTestColumns()
sqlStatment := "Ch%"
whereStatment := NewBinaryExpr(NewColumnResolve("name"), Like, NewLiteralResolve(arrow.BinaryTypes.String, string(sqlStatment)))
whereStatment := NewBinaryExpr(NewColumnResolve("name"), Like, NewLiteralResolve(arrow.BinaryTypes.String, sqlStatment))

boolMask, err := EvalExpression(whereStatment, rc)
if err != nil {
Expand Down Expand Up @@ -1727,7 +1726,6 @@ func TestNullCheckExpr(t *testing.T) {
defer maskArr.Release()

boolMask := maskArr.(*array.Boolean)
fmt.Printf("boolean mask:\t%v\n", boolMask)
if boolMask.Len() != 5 {
t.Fatalf("expected length 5 mask, got %d", boolMask.Len())
}
Expand Down
1 change: 0 additions & 1 deletion src/Backend/opti-sql-go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ var configInstance *Config = &Config{
EnableQueryStats: true,
EnableMemoryStats: true,
},
// TODO: remove hardcoded secretes before production. we are just testing for now
Secretes: secretesConfig{
AccessKey: "DO8013ZT6VDHJ2EM94RN",
SecretKey: "kPvQSMt6naiwe/FhDnzXpYmVE5yzJUsIR0/OJpsUNzo",
Expand Down
2 changes: 0 additions & 2 deletions src/Backend/opti-sql-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"os"
)

// TODO: in the project operators make sure the record batches account for the RowCount field properly.

func main() {
if len(os.Args) > 1 {
if err := config.Decode(os.Args[1]); err != nil {
Expand Down
5 changes: 1 addition & 4 deletions src/Backend/opti-sql-go/operators/Join/hashJoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ import (
"github.com/apache/arrow/go/v17/arrow/memory"
)

// TODO: clean up PR and push again
// TODO: write intergration test for operators to work together
// TODO: see ticket #27
// TODO: take small break from this project to work on inverted index search for a couple days

var (
ErrInvalidJoinClauseCount = func(l, r int) error {
Expand Down Expand Up @@ -395,7 +392,7 @@ func (hj *HashJoinExec) buildOutputArrays(
leftIdxArr arrow.Array,
rightIdxArr arrow.Array,
) ([]arrow.Array, error) {
ctx := context.TODO()
ctx := context.Background()

output := make([]arrow.Array, hj.schema.NumFields())
for i := range len(leftCols) {
Expand Down
158 changes: 158 additions & 0 deletions src/Backend/opti-sql-go/operators/OPERATORS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Operators — quick reference

This document gives a concise overview of the operator model used in this repository, how to construct the most common operators, and what each operator's constructor expects and why. Placeholders like `Expr.Expression` and `RecordBatch` refer to the repository types found under `Expr` and `operators/record.go`.

## What is an Operator?

An operator implements the `operators.Operator` interface:

- `Next(n uint16) (*operators.RecordBatch, error)` — return up to `n` rows (many operators ignore the exact n and read/produce what they need). Returns `io.EOF` when finished.
- `Schema() *arrow.Schema` — the operator's output schema.
- `Close() error` — release resources (files, network handles, etc.).

The basic data unit is `operators.RecordBatch` (schema + Arrow arrays + rowcount). Operators compose: the output of one operator becomes the input (child) of the next.

## Leaf (source) operators

Leaf operators are the pipeline entry points. They read data from some storage and produce `RecordBatch` values.

- CSV source
- Constructor: `project.NewProjectCSVLeaf(io.Reader)`
- Inputs: an `io.Reader` (file, buffer). Produces typed Arrow arrays from CSV columns.
- Notes: simple, fast for local CSVs. Use when you want a streaming CSV source.

- Parquet source
- Constructor: (parquet reader; see project package)
- Inputs: parquet file handle. Produces Arrow arrays preserving parquet types.

- In-memory source
- Constructor: `project.NewInMemoryProjectExec(names []string, columns []any)`
- Inputs: column names and Go slices (used heavily in unit tests).
- Notes: useful for deterministic test inputs and small-memory datasets.

- S3 / NetworkResource
- use `project.NewStreamReader` to create a network file reader. this just means it allows chunk reading of files not on local disk.
- Notes: the repository supports reading remote files; a configuration option lets you download the full remote file first to avoid per-request network latency when the operator needs repeated random access (e.g., for Parquet or when sorting). This is exposed as a NetworkResource / download option in the project/source constructors.
- the result of `project.NewStreamReader(fileName)` can be passed directly to `project.NewProjectCSVLeaf(io.Reader)` and `project.NewParquetSource(readSeeker)`. This was intentional so its seemless to work with s3 files as possible

## How to construct operators — summary of common operators

The pattern is consistent: each operator has a `NewXxx...` constructor that takes one or more child operators, expression descriptors, or configuration params.

### Project (Select)
- Constructor: `project.NewProjectExec(child operators.Operator, exprs []Expr.Expression)`
- Purpose: evaluate a list of projection expressions (column refs, scalar functions, aliases) and return a batch with only the requested columns.
- What to pass in:
- `child` — the input operator to project from (leaf or intermediate op).
- `exprs` — expressions created with `Expr.NewColumnResolve`, `Expr.NewLiteralResolve`, `Expr.NewAlias`, `Expr.NewScalarFunction`, etc.
- Why: keeps expression evaluation centralized and lets downstream operators work with a narrow schema.

### Filter
- Constructor: `filter.NewFilterExec(child operators.Operator, predicate Expr.Expression)`
- Purpose: apply boolean predicates to input rows and emit only matching rows.
- What to pass in:
- `child` — operator producing input rows.
- `predicate` — an `Expr.Expression` that evaluates to boolean (can combine binary operators, scalar functions, null checks).
- Why: decouples predicate evaluation from projection and other operators; filter may buffer results across batches to serve limit-like requests.

### Limit
- Constructor: `filter.NewLimitExec(child operators.Operator, limit uint64)`
- Purpose: stop the pipeline after `limit` rows are emitted.
- What to pass in: the `child` operator and the numeric `limit`.
- Why: simple consumer-side cap; implemented as a thin operator above any child.

### Distinct
- Constructor: `filter.NewDistinctExec(child operators.Operator, colExprs []Expr.Expression)`
- Purpose: remove duplicate rows on the selected key columns.
- What to pass in: `child` and the list of key column expressions.
- Why: used to produce unique values for a given set of columns; often followed by `Sort` for deterministic order.

### Sort / TopK
- Constructors:
- `aggr.NewSortExec(child operators.Operator, sortKeys []aggr.SortKey)` — fully sorts input
- `aggr.NewTopKSortExec(child operators.Operator, sortKeys []aggr.SortKey, k uint16)` — keep top-k
- Purpose: order rows by one or more columns.
- What to pass in:
- `child` — input operator
- `sortKeys` — built with `aggr.NewSortKey(expr Expr.Expression, asc bool)`; multiple keys are combined with `aggr.CombineSortKeys(...)`.
- Why: some consumers require sorted input (ORDER BY) or only the top-k entries (TopK).
- Notes: current implementations read data into memory and sort; care must be taken for large datasets.

### GroupBy / Aggregation
- Constructors:
- `aggr.NewGroupByExec(child operators.Operator, groupExpr []aggr.AggregateFunctions, groupBy []Expr.Expression)` — group-by with aggregates
- `aggr.NewGlobalAggrExec(child operators.Operator, aggExprs []aggr.AggregateFunctions)` — global aggregation (no GROUP BY)
- Purpose: compute aggregates (SUM, AVG, COUNT, MIN, MAX) grouped by one or more columns.
- What to pass in:
- `child` — input operator
- `groupExpr` / `aggExprs` — list of `aggr.AggregateFunctions` (built with `aggr.NewAggregateFunctions(aggr.AggrFunc, Expr.Expression)`) describing the aggregate function and its child expression (usually a column).
- `groupBy` — expressions for the group-by keys (column resolves).
- Why: central place for aggregator logic; constructors validate types (numeric types for SUM/AVG) and construct the output schema.

### Join (HashJoin)
- Constructor: `join.NewHashJoinExec(left, right operators.Operator, clause join.JoinClause, joinType join.JoinType, filters []Expr.Expression)`
- Purpose: perform hash-based joins (Inner, Left, Right).
- What to pass in:
- `left`, `right` — child operators for the two sides of the join (usually scans or projections)
- `clause` — `join.NewJoinClause(leftExprs []Expr.Expression, rightExprs []Expr.Expression)` describing which columns pair together (supports multiple equality clauses)
- `joinType` — `join.InnerJoin`, `join.LeftJoin`, etc.
- `filters` — optional post-join filters (not always used) | still need to implement this but no time soon, as these can just be treated as Filter Opererations
- Why: joins combine rows from two inputs. The constructor validates schema compatibility and builds the combined output schema (prefixing duplicate column names with `left_`/`right_`).
- Implementation notes: the HashJoin reads the entirety of both children (current implementation) into memory and builds a hash table on the right side for probing.

## Common constructor patterns & rationale

- Child operator(s) always come first: most operators are constructed around one input (`child`) or two (`left`, `right`). This makes pipelines composable.
- Expressions are passed as `Expr.Expression` objects. Use the `Expr` package helpers to build column resolves, literals, scalar functions, binary operators and aliases.
- Constructors perform validation: type checking for aggregates, matching # of join expressions, or validity of projection expressions — this fails fast at construction time instead of at runtime.
- Many blocking operators (Sort, GroupBy, Join) read the full input before producing output. Be careful with large inputs — these operators are not yet externalized (spill-to-disk) and may require configuration or chunking for large datasets.

## Practical examples (pseudocode)

- Project + Filter + Limit pipeline:

```go
src := project.NewProjectCSVLeaf(fileReader)
pred := Expr.NewBinaryExpr(Expr.NewColumnResolve("age"), Expr.GreaterThan, Expr.NewLiteralResolve(arrow.PrimitiveTypes.Int64, 30))
filt, _ := filter.NewFilterExec(src, pred)
projExprs := Expr.NewExpressions(Expr.NewColumnResolve("id"), Expr.NewColumnResolve("name"))
proj, _ := project.NewProjectExec(filt, projExprs)
lim, _ := filter.NewLimitExec(proj, 10)
batch, _ := lim.Next(10)
```

- GroupBy example:

```go
col := func(n string) Expr.Expression { return Expr.NewColumnResolve(n) }
aggs := []aggr.AggregateFunctions{{AggrFunc: aggr.Sum, Child: col("salary")}}
gb, _ := aggr.NewGroupByExec(src, aggs, []Expr.Expression{col("department")})
result, _ := gb.Next(1000)
```

- HashJoin example (equality on `id`):

```go
clause := join.NewJoinClause([]Expr.Expression{Expr.NewColumnResolve("id")}, []Expr.Expression{Expr.NewColumnResolve("id")})
j, _ := join.NewHashJoinExec(leftSrc, rightSrc, clause, join.InnerJoin, nil)
batch, _ := j.Next(100)
```

## Notes & best practices

- Always call `Close()` on the root operator when done (after `Next` returns `io.EOF`) to release files and network handles.
- Use `project.NewInMemoryProjectExec` for tests — it builds reproducible `RecordBatch` inputs quickly.
- When writing pipelines that may read remote files, prefer to configure the source to download the whole file if the operator will need random access or many read passes (sorting, joining, grouping). This avoids repeated network calls and unpredictable latency.
- Watch out for duplicate column names after joins: the join constructor prefixes with `left_`/`right_` when needed.

## Where to look next in the codebase
- `operators/record.go` — `Operator` interface and `RecordBatch` helpers (builder, PrettyPrint).
- `operators/project/` — project implementations and CSV/parquet readers.
- `operators/filter/` — Filter, Limit, Distinct operator implementations.
- `operators/aggr/` — Sort, TopK, GroupBy and aggregate implementations.
- `operators/Join/` — HashJoin implementation.

Reading the tests
-----------------

For concrete examples of how SQL statements map to operator pipelines, read the integration/unit tests in `operators/test/` (and other test files under `operators/`). The tests build real pipelines (CSV/InMemory -> Filter/Project/Join/GroupBy/Sort/etc.) and show the exact constructor calls and expressions used to represent SQL queries. They are the best source of truth for small end-to-end examples.
16 changes: 16 additions & 0 deletions src/Backend/opti-sql-go/operators/aggr/groupBy.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,16 @@ func buildDynamicArray(mem memory.Allocator, dt arrow.DataType, values []any) ar
// ===========================
// UNSUPPORTED TYPE
// ===========================
case arrow.BOOL:
b := array.NewBooleanBuilder(mem)
for _, v := range values {
if v == nil {
b.AppendNull()
} else {
b.Append(castToBool(v))
}
}
return b.NewArray()
default:
panic(fmt.Sprintf("unsupported dynamic array type: %v", dt))
}
Expand All @@ -440,3 +450,9 @@ func buildFloatArray(mem memory.Allocator, values []float64) arrow.Array {
b.AppendValues(values, nil)
return b.NewArray()
}
func castToBool(v any) bool {
if v == "true" || v == true {
return true
}
return false
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The castToBool function only checks for "true" string and true boolean. This will incorrectly return false for the string "false", which should be considered a valid boolean value. Consider handling "false" explicitly and potentially raising an error for invalid inputs.

Suggested change
return false
if v == "false" || v == false {
return false
}
panic(fmt.Sprintf("invalid value for castToBool: %v (type %T)", v, v))

Copilot uses AI. Check for mistakes.
}
2 changes: 0 additions & 2 deletions src/Backend/opti-sql-go/operators/aggr/groupBy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func TestNewGroupByExecAndSchema(t *testing.T) {
if schema == nil {
t.Fatalf("schema should not be nil")
}
fmt.Println(schema)

// group-by + 1 agg = 2 fields
if got, want := schema.NumFields(), 2; got != want {
Expand Down Expand Up @@ -198,7 +197,6 @@ func TestNewGroupByExecAndSchema(t *testing.T) {
}

schema := gb.Schema()
fmt.Printf("schema: %v\n", schema)
wantFields := len(groupBy) + len(aggs)
if schema.NumFields() != wantFields {
t.Fatalf("expected %d fields, got %d", wantFields, schema.NumFields())
Expand Down
2 changes: 1 addition & 1 deletion src/Backend/opti-sql-go/operators/aggr/singleAggr.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func validAggrType(dt arrow.DataType) bool {
}

func castArrayToFloat64(arr arrow.Array) (arrow.Array, error) {
outDatum, err := compute.CastArray(context.TODO(), arr, compute.NewCastOptions(&arrow.Float64Type{}, true))
outDatum, err := compute.CastArray(context.Background(), arr, compute.NewCastOptions(&arrow.Float64Type{}, true))
if err != nil {
return nil, err
}
Expand Down
Loading