Conversation
There was a problem hiding this comment.
Pull request overview
This PR implements hash join functionality for the opti-sql-go query engine. The implementation adds a HashJoinExec operator that performs inner joins using hash-based joining, along with a placeholder for SortMergeJoinExec. The PR also includes a minor fix to error message capitalization in an unrelated file.
Key Changes:
- Implements
HashJoinExecoperator with support for multi-attribute join keys and SQL NULL semantics - Adds comprehensive test coverage for hash join scenarios including simple keys, multi-attribute keys, and computed keys
- Fixes error message capitalization to follow Go conventions
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 25 comments.
| File | Description |
|---|---|
| src/Backend/opti-sql-go/operators/filter/limit.go | Fixes error message capitalization (uppercase to lowercase) |
| src/Backend/opti-sql-go/operators/Join/hashJoin.go | Implements hash join and sort-merge join operators with join schema handling and hash table building logic |
| src/Backend/opti-sql-go/operators/Join/hashJoin_test.go | Adds comprehensive test suite for hash join functionality including edge cases and multi-attribute joins |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| t.Run("playground", func(t *testing.T) { | ||
| left, right := newSources() | ||
| joinPred := NewJoinClause(Expr.NewExpressions(Expr.NewColumnResolve("id")), Expr.NewExpressions(Expr.NewColumnResolve("id"))) | ||
| smjExec, err := NewHashJoinExec(left, right, joinPred, InnerJoin, nil) | ||
| if err != nil { | ||
| t.Fatalf("unexpected error: %v", err) | ||
| } | ||
| v, _ := smjExec.Next(5) | ||
| t.Logf("expected schema:\t\n%v\n\n", smjExec.Schema()) | ||
| t.Logf("recieved schema:\t\n%v\n\n", v.Schema) | ||
| t.Logf("\t\n\n\t%+v\n", v.PrettyPrint()) | ||
|
|
||
| }) |
There was a problem hiding this comment.
Missing test cleanup: Test resources are not being properly released. The left and right sources created at line 197 should be closed after the test completes. Add a defer statement:
defer left.Close()
defer right.Close()| leftSource, _ := project.NewInMemoryProjectExecFromArrays(leftNames, leftCols) | ||
| rightSource, _ := project.NewInMemoryProjectExecFromArrays(rightNames, rightCols) | ||
|
|
||
| leftExprs := Expr.NewExpressions(Expr.NewColumnResolve("email_lower")) | ||
| rightExprs := Expr.NewExpressions(Expr.NewColumnResolve("email_lower")) | ||
| clause := NewJoinClause(leftExprs, rightExprs) | ||
|
|
||
| hj, err := NewHashJoinExec(leftSource, rightSource, clause, InnerJoin, nil) | ||
| if err != nil { | ||
| t.Fatalf("NewHashJoinExec failed: %v", err) | ||
| } | ||
|
|
||
| defer func() { | ||
| if err := hj.Close(); err != nil { | ||
| t.Fatalf("HashJoinExec Close failed: %v", err) | ||
| } | ||
|
|
||
| }() | ||
| batches := collectAllRows(t, hj) | ||
| totalRows := flattenRowCount(batches) | ||
|
|
||
| // Overlap on email_lower: alice + charlie => 2 rows. | ||
| if totalRows != 2 { | ||
| t.Fatalf("expected 2 joined rows on email_lower, got %d", totalRows) | ||
| } | ||
|
|
||
| if len(batches) == 0 { | ||
| t.Fatal("expected at least one batch") | ||
| } | ||
| first := batches[0] | ||
|
|
||
| leftEmailExpr := Expr.NewColumnResolve("left_email_lower") | ||
| rightEmailExpr := Expr.NewColumnResolve("right_email_lower") | ||
|
|
||
| leftArr, err := Expr.EvalExpression(leftEmailExpr, first) | ||
| if err != nil { | ||
| t.Fatalf("EvalExpression left_email_lower failed: %v", err) | ||
| } | ||
| defer leftArr.Release() | ||
|
|
||
| rightArr, err := Expr.EvalExpression(rightEmailExpr, first) | ||
| if err != nil { | ||
| t.Fatalf("EvalExpression right_email_lower failed: %v", err) | ||
| } | ||
| defer rightArr.Release() | ||
|
|
||
| lStr := leftArr.(*array.String) | ||
| rStr := rightArr.(*array.String) | ||
|
|
||
| if lStr.Len() != rStr.Len() { | ||
| t.Fatalf("expected same length for left/right email arrays, got %d vs %d", | ||
| lStr.Len(), rStr.Len()) | ||
| } | ||
| for i := 0; i < lStr.Len(); i++ { | ||
| if lStr.IsNull(i) || rStr.IsNull(i) { | ||
| t.Fatalf("unexpected NULL email at row %d", i) | ||
| } | ||
| } |
There was a problem hiding this comment.
Missing test cleanup: The test sources leftSource and rightSource created at lines 522-523 are not closed. Add cleanup:
defer leftSource.Close()
defer rightSource.Close()|
|
||
| func TestHashJoin(t *testing.T) { | ||
| // Simple passing test | ||
| "github.com/apache/arrow/go/v15/arrow/memory" |
There was a problem hiding this comment.
Unused import: The package github.com/apache/arrow/go/v15/arrow/memory is imported at line 12 but appears to be unused. The code uses memory.NewGoAllocator() which comes from the v17 import at line 18. Consider removing this unused import.
| "github.com/apache/arrow/go/v15/arrow/memory" |
| func NewHashJoinExec(left operators.Operator, right operators.Operator, clause JoinClause, joinType JoinType, filters []Expr.Expression) (*HashJoinExec, error) { | ||
| fmt.Printf("join clause: \t%v\njoin Type: \t%v\n", clause.String(), joinType) | ||
| schema, err := joinSchemas(left.Schema(), right.Schema()) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if len(clause.leftS) != len(clause.rightS) { | ||
| return nil, ErrInvalidJoinClauseCount(len(clause.leftS), len(clause.rightS)) | ||
| } | ||
| return &HashJoinExec{ | ||
| leftSource: left, | ||
| rightSource: right, | ||
| clause: clause, | ||
| joinType: joinType, | ||
| filters: filters, | ||
| schema: schema, | ||
| outputBatch: make([]arrow.Array, schema.NumFields()), | ||
| }, nil | ||
| } | ||
|
|
||
| func (hj *HashJoinExec) Next(_ uint16) (*operators.RecordBatch, error) { | ||
| if hj.done { | ||
| return nil, io.EOF | ||
| } | ||
| mem := memory.NewGoAllocator() | ||
| leftArr, err := consumeOperator(hj.leftSource, mem) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| rightArr, err := consumeOperator(hj.rightSource, mem) | ||
| if err != nil { | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
Missing documentation: The HashJoinExec struct and its methods lack documentation. Public types and exported methods should have doc comments explaining their purpose, behavior, and usage. Add doc comments for HashJoinExec, NewHashJoinExec, Next, Schema, and Close.
| func NewHashJoinExec(left operators.Operator, right operators.Operator, clause JoinClause, joinType JoinType, filters []Expr.Expression) (*HashJoinExec, error) { | |
| fmt.Printf("join clause: \t%v\njoin Type: \t%v\n", clause.String(), joinType) | |
| schema, err := joinSchemas(left.Schema(), right.Schema()) | |
| if err != nil { | |
| return nil, err | |
| } | |
| if len(clause.leftS) != len(clause.rightS) { | |
| return nil, ErrInvalidJoinClauseCount(len(clause.leftS), len(clause.rightS)) | |
| } | |
| return &HashJoinExec{ | |
| leftSource: left, | |
| rightSource: right, | |
| clause: clause, | |
| joinType: joinType, | |
| filters: filters, | |
| schema: schema, | |
| outputBatch: make([]arrow.Array, schema.NumFields()), | |
| }, nil | |
| } | |
| func (hj *HashJoinExec) Next(_ uint16) (*operators.RecordBatch, error) { | |
| if hj.done { | |
| return nil, io.EOF | |
| } | |
| mem := memory.NewGoAllocator() | |
| leftArr, err := consumeOperator(hj.leftSource, mem) | |
| if err != nil { | |
| return nil, err | |
| } | |
| rightArr, err := consumeOperator(hj.rightSource, mem) | |
| if err != nil { | |
| return nil, err | |
| } |
| } | ||
| v, _ := smjExec.Next(5) | ||
| t.Logf("expected schema:\t\n%v\n\n", smjExec.Schema()) | ||
| t.Logf("recieved schema:\t\n%v\n\n", v.Schema) |
There was a problem hiding this comment.
Spelling error: "recieved" should be "received".
| t.Logf("recieved schema:\t\n%v\n\n", v.Schema) | |
| t.Logf("received schema:\t\n%v\n\n", v.Schema) |
|
|
||
| type JoinType int | ||
|
|
||
| const ( | ||
| InnerJoin JoinType = iota | ||
| LeftJoin |
There was a problem hiding this comment.
Missing documentation: The JoinType type and its constants lack documentation. Add doc comments explaining the type and each constant value (InnerJoin, LeftJoin, RightJoin).
| type JoinType int | |
| const ( | |
| InnerJoin JoinType = iota | |
| LeftJoin | |
| // JoinType represents the type of SQL join operation to perform. | |
| type JoinType int | |
| const ( | |
| // InnerJoin represents an inner join, returning rows when there is a match in both tables. | |
| InnerJoin JoinType = iota | |
| // LeftJoin represents a left outer join, returning all rows from the left table and matched rows from the right table. | |
| LeftJoin | |
| // RightJoin represents a right outer join, returning all rows from the right table and matched rows from the left table. |
| rightSource operators.Operator | ||
| clause JoinClause | ||
| joinType JoinType | ||
| filters []Expr.Expression //TODO: incorpoarte |
There was a problem hiding this comment.
Typo in comment: "incorpoarte" should be "incorporate".
| filters []Expr.Expression //TODO: incorpoarte | |
| filters []Expr.Expression //TODO: incorporate |
| func (smj *SortMergeJoinExec) Next(n uint16) (*operators.RecordBatch, error) { | ||
| if smj.done { | ||
| return nil, io.EOF | ||
| } | ||
| return nil, nil | ||
| } |
There was a problem hiding this comment.
The SortMergeJoinExec operator interface is not being satisfied correctly. The variable declaration on line 28 references SortMergeJoinExec but assigns it the wrong type. However, looking at the pattern, this appears to be a compile-time check. The more critical issue is that SortMergeJoinExec should likely not be in this file at all since this PR is for implementing hash join, or if it is, the implementation should be complete (currently Next() just returns nil, nil).
| func buildComptables(exprs []Expr.Expression, cols []arrow.Array, schema *arrow.Schema) ([]arrow.Array, error) { | ||
| compArr := make([]arrow.Array, len(exprs)) | ||
| for i, expr := range exprs { | ||
| arr, err := Expr.EvalExpression(expr, &operators.RecordBatch{ | ||
| Schema: schema, | ||
| Columns: cols, | ||
| RowCount: uint64(cols[0].Len()), | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| compArr[i] = arr | ||
| } | ||
| return compArr, nil | ||
|
|
There was a problem hiding this comment.
Resource leak: Arrays created in buildComptables are never released. The arrays returned from Expr.EvalExpression at line 364-371 should be managed and released appropriately. Either the caller should release them or this function should handle cleanup in error cases. Consider documenting ownership semantics or adding proper cleanup.
| } | ||
|
|
||
| rightComp, err := buildComptables(hj.clause.rightS, rightArr, hj.rightSource.Schema()) | ||
| if err != nil { | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
Missing resource cleanup: The arrays in leftComp and rightComp obtained from buildComptables (lines 283-291) are never released. These should be cleaned up to prevent memory leaks. Consider adding deferred release calls for all arrays in both slices.
| } | |
| rightComp, err := buildComptables(hj.clause.rightS, rightArr, hj.rightSource.Schema()) | |
| if err != nil { | |
| return nil, err | |
| } | |
| } | |
| for _, arr := range leftComp { | |
| defer arr.Release() | |
| } | |
| rightComp, err := buildComptables(hj.clause.rightS, rightArr, hj.rightSource.Schema()) | |
| if err != nil { | |
| return nil, err | |
| } | |
| for _, arr := range rightComp { | |
| defer arr.Release() | |
| } |
No description provided.