Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses crash states and debugging issues by adding comprehensive structured logging using Zap, implementing panic recovery, fixing memory management in the parquet reader, and changing aggregation column naming conventions.
Changes:
- Added structured logging throughout the codebase using go.uber.org/zap for better debugging and monitoring
- Implemented panic recovery in ExecuteQuery to prevent server crashes from individual query failures
- Rewrote parquet buffering logic to properly handle batch size requests and avoid returning inconsistent row counts
- Changed aggregation column naming from descriptive prefixes (e.g., "sum_Column(salary)") to simple column names (e.g., "salary")
- Replaced t.Fatalf with t.Errorf in tests to allow continued test execution
- Added extensive regression tests for aliasing and aggregation scenarios
Reviewed changes
Copilot reviewed 31 out of 32 changed files in this pull request and generated 18 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Backend/opti-sql-go/config/log.go | New singleton logger implementation using Zap with JSON encoding |
| src/Backend/opti-sql-go/substrait/server.go | Added panic recovery, structured logging for query execution lifecycle |
| src/Backend/opti-sql-go/operators/project/parquet.go | Rewrote buffering logic to properly handle partial batch reads |
| src/Backend/opti-sql-go/operators/aggr/singleAggr.go | Changed aggregation schema naming, marked fields as non-nullable |
| src/Backend/opti-sql-go/operators/aggr/groupBy.go | Updated group-by schema naming to use simple column names |
| src/Backend/opti-sql-go/Expr/expr.go | Added To_aggr_name helper, commented out Retain call |
| src/Backend/opti-sql-go/operators/project/projectExec.go | Added logging, commented out ReleaseArrays call |
| src/Backend/opti-sql-go/operators/test/regression_test.go | New comprehensive regression tests for aliasing scenarios |
| src/Backend/opti-sql-go/substrait/substrait_integration_test.go | Updated test structure, commented out one test case |
| README.md | Version bump to 0.9.4, added TODO notes |
Comments suppressed due to low confidence (1)
src/Backend/opti-sql-go/operators/project/parquet.go:220
- The Close() method doesn't release the buffered columns, causing a memory leak. When ParquetSource is closed, only the reader is released (line 218-219), but the bufferedCols arrays that may still be retained are not released. This means any buffered data will leak when the source is closed. Add a loop to release all arrays in bufferedCols before setting the reader to nil.
func (ps *ParquetSource) Close() error {
ps.reader.Release()
ps.reader = nil
return nil
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /*{ | ||
| id: 1, | ||
| testName: "select id,name,age from employees where id > 5", | ||
| fileName: "select-filter.txt", | ||
| },*/ |
There was a problem hiding this comment.
The first test case in the table is commented out without explanation. The commented test for "select id,name,age from employees where id > 5" with id:1 and fileName: "select-filter.txt" should either be removed entirely if it's no longer needed, or uncommented if it's still valid. Leaving commented test cases in the code creates confusion about whether they should be run and makes the codebase harder to maintain.
| /*{ | |
| id: 1, | |
| testName: "select id,name,age from employees where id > 5", | |
| fileName: "select-filter.txt", | |
| },*/ |
| Name: fieldName, | ||
| Type: arrow.PrimitiveTypes.Float64, | ||
| Nullable: true, | ||
| Nullable: false, |
There was a problem hiding this comment.
The nullable field for aggregation result fields has been changed from true to false. This change means that aggregation results are now marked as non-nullable in the schema. However, aggregations like MIN, MAX, AVG, SUM, and COUNT can return NULL when applied to empty groups or when all input values are NULL. Setting Nullable to false could cause runtime errors or incorrect behavior when these NULL cases occur. This should be reverted to true unless there's a guarantee that NULLs will never be produced.
| Nullable: false, | |
| Nullable: true, |
| Message: fmt.Sprintf("Internal server error: panic recovered: %v", r), | ||
| }, | ||
| } | ||
| err = nil // Return error as part of response, not as gRPC error |
There was a problem hiding this comment.
The panic recovery in ExecuteQuery sets err to nil after converting the panic to an error response. This means the gRPC method will return (response, nil) even when a panic occurred, treating panics as successful operations at the gRPC level. This could confuse monitoring systems and clients that check the error return value. Consider whether panics should be returned as gRPC errors instead of being absorbed into the response payload, or document this design decision clearly.
| ps.reader.Release() | ||
| ps.reader = nil |
There was a problem hiding this comment.
The parquet buffering implementation has a potential memory accumulation issue. The sliceBufferCols function releases the old buffer columns and replaces them with the remaining slice, but if errors occur during Next() calls before all buffered data is consumed, the buffered arrays won't be released, causing a memory leak. Consider adding a Close() method or finalizer to ensure bufferedCols arrays are properly released when the ParquetSource is discarded.
| ps.reader.Release() | |
| ps.reader = nil | |
| // Release any buffered column arrays to avoid leaking Arrow-managed memory. | |
| for i, col := range ps.bufferedCols { | |
| if col != nil { | |
| col.Release() | |
| ps.bufferedCols[i] = nil | |
| } | |
| } | |
| ps.bufferedSize = 0 | |
| // Release the underlying reader, if present. | |
| if ps.reader != nil { | |
| ps.reader.Release() | |
| ps.reader = nil | |
| } |
| /*fieldName := fmt.Sprintf("%s_%s", | ||
| strings.ToLower(aggrToString(int(agg.AggrFunc))), | ||
| agg.Child.String(), | ||
| ) | ||
| )*/ |
There was a problem hiding this comment.
The commented-out code in buildGroupBySchema should be removed. The lines containing the old field naming logic are commented out and immediately followed by the new implementation. Leaving commented code in production makes the codebase harder to read and maintain. If this code needs to be preserved for reference, it should be documented in a commit message or design document instead.
| testID := testObj.id | ||
| fmt.Printf("%v\n", testID) |
There was a problem hiding this comment.
Debug logging statements using fmt.Printf are being replaced with structured logging, but the test ID variable is created but not used meaningfully in the test. The variable testID is assigned from testObj.id and then immediately printed, but it serves no other purpose in the test logic. Consider removing this variable if it's not needed for debugging, or use it to add more meaningful context to test failures.
| testID := testObj.id | |
| fmt.Printf("%v\n", testID) |
| // ! query doesnt match code | ||
| // (5.A) SELECT favorite_color, AVG(age_years) AS avg_age, SUM(account_balance_usd) AS total_balance FROM source1 GROUP BY favorite_color order by avg_age; | ||
| t.Run("5A", func(t *testing.T) { | ||
| src := source1Project() |
There was a problem hiding this comment.
The TODO comment indicates incomplete work. The comment "! query doesnt match code" suggests there's a mismatch between the test description and the actual test implementation. This should be resolved before merging - either update the SQL comment to match the code, or fix the code to match the intended SQL query. Leaving this unresolved could lead to confusion about what the test is actually verifying.
| // ! query doesnt match code | |
| // (5.A) SELECT favorite_color, AVG(age_years) AS avg_age, SUM(account_balance_usd) AS total_balance FROM source1 GROUP BY favorite_color order by avg_age; | |
| t.Run("5A", func(t *testing.T) { | |
| src := source1Project() | |
| // (5.A) SELECT favorite_color, AVG(age_years) AS age_years, SUM(account_balance_usd) AS account_balance_usd FROM source1 GROUP BY favorite_color ORDER BY age_years; | |
| t.Run("5A", func(t *testing.T) { | |
| src := source1Project() | |
| src := source1Project() |
| fName := fmt.Sprintf("%s-%s-%d", strings.ReplaceAll(req.SqlStatement, " ", "-"), req.Id, rand.IntN(1000)) | ||
| // ! todo: finish debugging | ||
| fmt.Printf("csv file-name: {%s}, produced:\n%s", fName, csv) | ||
| logger.Debug("CSV file produced", zap.String("file_name", fName), zap.String("csv_content", string(csv))) |
There was a problem hiding this comment.
CSV content is being logged in debug mode, which could expose actual query results containing sensitive data. Line 132 logs the entire CSV content as a string. If query results contain PII, financial data, or other sensitive information, this will be written to log files. Consider either removing this logging statement, using a summary (e.g., just the row/column count), or ensuring that production deployments use a log level that suppresses debug logs.
| // To_aggr_name extracts the column name from an expression for use in aggregation schema building. | ||
| // Returns the alias name if present, otherwise the column name. | ||
| func To_aggr_name(expr Expression) string { |
There was a problem hiding this comment.
The To_aggr_name function uses an underscore prefix which is unconventional for exported Go functions. In Go, exported functions (those starting with a capital letter) typically use PascalCase naming. The function name "To_aggr_name" should be renamed to "ToAggrName" or "ToAggregationName" to follow Go naming conventions.
| // To_aggr_name extracts the column name from an expression for use in aggregation schema building. | |
| // Returns the alias name if present, otherwise the column name. | |
| func To_aggr_name(expr Expression) string { | |
| // ToAggrName extracts the column name from an expression for use in aggregation schema building. | |
| // Returns the alias name if present, otherwise the column name. | |
| func ToAggrName(expr Expression) string { |
| @@ -1353,6 +1362,7 @@ func TestSubstraitRegression(t *testing.T) { | |||
| if err != nil { | |||
| t.Fatalf("failed to read file contents, recieved this error: %v", err) | |||
There was a problem hiding this comment.
The word "recieved" is misspelled. It should be "received".
No description provided.