From 0abb34430273b33e40da1508409a242c4ec5deab Mon Sep 17 00:00:00 2001 From: Richard Baah Date: Fri, 5 Dec 2025 02:53:00 -0500 Subject: [PATCH 1/2] feat: generate test substrait plans & basic parse --- src/Backend/opti-sql-go/substrait/substrait.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Backend/opti-sql-go/substrait/substrait.go b/src/Backend/opti-sql-go/substrait/substrait.go index a809ba7..8030f02 100644 --- a/src/Backend/opti-sql-go/substrait/substrait.go +++ b/src/Backend/opti-sql-go/substrait/substrait.go @@ -1 +1,7 @@ package substrait + +// turn raw bytes to substrait plan +// parse substrait plan to in memory format we can work with (abstract syntax tree) +// optimize the plan (apply optimizations) +// run operators based on the optimized plan +// From c25fdec29b7f4e5b31548dbc17e425f689d60eaf Mon Sep 17 00:00:00 2001 From: Richard Baah Date: Sun, 21 Dec 2025 20:29:14 -0500 Subject: [PATCH 2/2] Documentation: attempt to complete ticket #37. first draft --- .../opti-sql-go/operators/project/csv.go | 1 - src/Backend/opti-sql-go/substrait/expr.md | 206 +++++++++++ src/Backend/opti-sql-go/substrait/format.md | 336 ++++++++++++++++++ .../opti-sql-go/substrait/operation.pb.go | 132 ++----- .../substrait/operation_grpc.pb.go | 6 +- src/Backend/opti-sql-go/substrait/server.go | 4 +- .../opti-sql-go/substrait/substrait.go | 1 - .../opti-sql-go/substrait/substrait_test.go | 10 +- src/Contract/operation.proto | 8 +- 9 files changed, 585 insertions(+), 119 deletions(-) create mode 100644 src/Backend/opti-sql-go/substrait/expr.md create mode 100644 src/Backend/opti-sql-go/substrait/format.md diff --git a/src/Backend/opti-sql-go/operators/project/csv.go b/src/Backend/opti-sql-go/operators/project/csv.go index 7f57686..e57e45b 100644 --- a/src/Backend/opti-sql-go/operators/project/csv.go +++ b/src/Backend/opti-sql-go/operators/project/csv.go @@ -204,7 +204,6 @@ func parseDataType(sample string) arrow.DataType { if sample == "" || strings.EqualFold(sample, "NULL") { return arrow.BinaryTypes.String } - // Boolean if sample == "true" || sample == "false" { return arrow.FixedWidthTypes.Boolean diff --git a/src/Backend/opti-sql-go/substrait/expr.md b/src/Backend/opti-sql-go/substrait/expr.md new file mode 100644 index 0000000..bec708c --- /dev/null +++ b/src/Backend/opti-sql-go/substrait/expr.md @@ -0,0 +1,206 @@ +--- +## Expressions + +Expressions are encoded as **tagged objects** using `expr_type`. +They are evaluated row-wise against a record batch and return an Arrow array. + +When applicable, expressions **may include an explicit Arrow type** to avoid inference ambiguity. +--- + +## `Valid Literal Types` + +```bash +"int" +"string" +"boolean" +"float64" +``` + +## `Valid Binary Operators` + +```bash +"Addition" +"Subtraction" +"Multiplication" +"Division" +# comparison +"Equal" +"NotEqual" +"LessThan" +"LessThanOrEqual" +"GreaterThan" +"GreaterThanOrEqual" +# logical +"And" +"Or" +``` + +## `Valid Scalar functions` + +```bash +"Upper" +"Lower" +"Abs" +"Round" +``` + +## `Valid Aggregations functions` + +```bash +"Sum" +"Count" +"Avg" +"Min" +"Max" +``` + +## `ColumnResolve` + +Resolves a column from the input batch. + +```bash +{ + "expr_type": "ColumnResolve", + "name": "a" +} +``` + +--- + +## `LiteralResolve` + +Represents a constant literal value. + +```bash +{ + "expr_type": "LiteralResolve", + "value": 10, + "lit_type": "int" +} +``` + +### Notes + +- `lit_type` is **optional** +- When provided, it must be a valid Arrow primitive type +- If omitted, the engine may infer the type + +--- + +## `BinaryExpr` + +Applies a binary operator to two expressions. + +```bash +{ + "expr_type": "BinaryExpr", + "op": "GreaterThan", # or any (valid) binary operator + "left": { + "expr_type": "ColumnResolve", + "name": "a" + }, + "right": { + "expr_type": "LiteralResolve", + "value": 10, + "lit_type": "int" + } +} +``` + +- Comparison and logical operators must return a boolean array +- Left and right expressions must resolve to compatible Arrow types + +--- + +## `ScalarFunction` + +Applies a scalar function element-wise. + +```bash +{ + "expr_type": "ScalarFunction", + "func": "Upper", # or any (Valid) scalar function + "args": [ + { + "expr_type": "ColumnResolve", + "name": "name" + } + ] +} +``` + +--- + +## `Alias` + +Attaches a name to an expression. + +```bash +{ + "expr_type": "Alias", + "expr": { + "expr_type": "ColumnResolve", + "name": "a" + }, + "name": "alias_a" +} +``` + +- Alias affects **naming only** +- Evaluation result is unchanged + +--- + +## `CastExpr` + +Casts the result of an expression to a specific Arrow type. + +```bash +{ + "expr_type": "CastExpr", + "expr": { + "expr_type": "ColumnResolve", + "name": "a" + }, + "to_type": "Float64" +} +``` + +--- + +## `NullCheckExpr` + +Checks whether values are null or non-null. + +```bash +{ + "expr_type": "NullCheckExpr", + "expr": { + "expr_type": "ColumnResolve", + "name": "a" + }, + "is_null": true +} +``` + +- Produces a boolean mask + +--- + +## Expression Type Enum + +`expr_type` is a **closed enum**. + +```bash +ColumnResolve +LiteralResolve +BinaryExpr +ScalarFunction +Alias +CastExpr +NullCheckExpr +``` + +Each expression object **must** contain exactly one `expr_type`. + +--- diff --git a/src/Backend/opti-sql-go/substrait/format.md b/src/Backend/opti-sql-go/substrait/format.md new file mode 100644 index 0000000..6ecc6ca --- /dev/null +++ b/src/Backend/opti-sql-go/substrait/format.md @@ -0,0 +1,336 @@ +# Custom intermediate in memory representation of sql logical/physical plans + +### why? + +_The primary reason for this layer is flexibility. By decoupling intermediate data representation from Substrait plans, we can accept multiple data formats. As long as we interpret them into this IR, the physical operators work unchanged_ + +## source operator + +```bash +"source-node":{"link-to-s3"} +# file ext must end in .csv or .parquet +``` + +## Project operator + +**sql** : `select a , b , c` + +```bash +"Project": {"input":{operator},"columns":["a","b","c"], "alias":["alias_a","alias_b","alias_c"] +``` + +**alias count must match up with the column count. if a query contains no alias for column at position x , leave alias[x] = "" and the name with remain the same** + +## Filter Operator + +**sql**: `select a,b from source where a > 10` + +```bash +"Filter":{"input":{operator},"expression":{Expression}} +``` + +**Example** + +```bash +"Filter":{"input":{csv_source_exec},"expression":{"expr_type":"LiteralResolve","value":10,"Lit_Type":"int"}} +``` + +--- + +## Distinct Operator + +**sql**: `select distinct a, b from source` + +```bash +"Distinct": { + "input": {operator}, + "columns": ["a", "b"] +} +``` + +- Removes duplicate rows based on the specified columns +- Output includes only the listed columns + +--- + +## Limit Operator + +**sql**: `select a,b from source limit 10` + +```bash +"Limit": { + "input": {operator}, + "limit": 10 +} +``` + +--- + +## Sort Operator + +**sql**: `select a,b from source order by a desc, b asc` + +```bash +"Sort": { + "input": {operator}, + "by": [ + { "column": "a", "order": "DESC" }, + { "column": "b", "order": "ASC" } + ] +} +``` + +- `order` defaults to `ASC` if omitted + +--- + +## Single Column Aggregation Operator + +**sql**: `select sum(a) from source` + +```bash +"Aggregate": { + "input": {operator}, + "function": "Sum", + "column": "a", + "alias": "sum_a" +} +``` + +- Operates on exactly one column +- `alias` is **optional** +- Output contains a single row + +--- + +## Having Operator + +**sql**: `select sum(a) from source having sum(a) > 10` + +```bash +"Having": { + "input": {operator}, + "expression": {Expression} +} +``` + +- Semantics identical to `Filter` +- Applied after aggregation +- Expression must resolve to a boolean mask + +--- + +## Join Operator + +**sql**: +`select * from a join b on a.id = b.id` + +```bash +"Join": { + "left": {operator}, + "right": {operator}, + "join_type": "Inner", + "on": [ + { + "left": { "expr_type": "ColumnResolve", "name": "a.id" }, + "right": { "expr_type": "ColumnResolve", "name": "b.id" } + } + ] +} +``` + +### Supported Join Types + +- `Inner` + +### Notes + +- `on` is an array to support multi-column joins +- Join condition expressions must be **equality comparisons** + +--- + +## Group By Operator + +**sql**: +`select b, sum(a) from source group by b` + +```bash +"GroupBy": { + "input": {operator}, + "group_by": [ + { "expr_type": "ColumnResolve", "name": "b" } + ], + "aggregates": [ + { + "function": "Sum", + "column": "a", + "alias": "sum_a" + } + ] +} +``` + +### Notes + +- `group_by` defines the grouping keys +- Each aggregate operates on **exactly one column** +- `alias` on aggregates is optional +- One output row is produced per group + +--- + +## Example 1 — Source → Filter + +**sql**: `select * from source where a > 10` + +```bash +"Emit": { + "Filter": { + "input": { + "Source": "s3://bucket/data.csv" + }, + "expression": { + "expr_type": "BinaryExpr", + "op": "GreaterThan", + "left": { "expr_type": "ColumnResolve", "name": "a" }, + "right": { + "expr_type": "LiteralResolve", + "value": 10, + "lit_type": "i32" + } + } + } +} +``` + +--- + +## Example 2 — Source → Project → Sort + +**sql**: `select a, b from source order by a` + +```bash +"Emit": { + "Sort": { + "input": { + "Project": { + "input": { + "Source": "s3://bucket/data.csv" + }, + "columns": ["a", "b"], + "alias": ["", ""] + } + }, + "by": [{ "column": "a" }] + } +} +``` + +--- + +## Example 3 — Source → Group By → Aggregate + +**sql**: `select b, count(a) from source group by b` + +```bash +"Emit": { + "GroupBy": { + "input": { + "Source": "s3://bucket/data.csv" + }, + "group_by": [ + { "expr_type": "ColumnResolve", "name": "b" } + ], + "aggregates": [ + { + "function": "Count", + "column": "a", + "alias": "count_a" + } + ] + } +} +``` + +Here’s a **clean final pair** that fits the docs tone: +one **combined but still simple**, one **slightly more advanced** (no deep nesting, no aggregation). + +--- + +## Example 4 — Source → Distinct → Limit + +**sql**: `select distinct a from source limit 5` + +```bash +"Emit": { + "Limit": { + "input": { + "Distinct": { + "input": { + "Source": "s3://bucket/data.csv" + }, + "columns": ["a"] + } + }, + "limit": 5 + } +} +``` + +--- + +## Example 5 — Join → Filter → Sort → Limit + +**sql**: + +```sql +select u.name, o.amount +from users u +join orders o on u.id = o.user_id +where o.amount > 50 +order by o.amount desc +limit 10 +``` + +```bash +"Emit": { + "Limit": { + "input": { + "Sort": { + "input": { + "Filter": { + "input": { + "Join": { + "left": { "Source": "s3://bucket/users.csv" }, + "right": { "Source": "s3://bucket/orders.csv" }, + "join_type": "Inner", + "on": [ + { + "left": { "expr_type": "ColumnResolve", "name": "u.id" }, + "right": { "expr_type": "ColumnResolve", "name": "o.user_id" } + } + ] + } + }, + "expression": { + "expr_type": "BinaryExpr", + "op": "GreaterThan", + "left": { "expr_type": "ColumnResolve", "name": "o.amount" }, + "right": { + "expr_type": "LiteralResolve", + "value": 50, + "lit_type": "i32" + } + } + } + }, + "by": [{ "column": "o.amount", "order": "DESC" }] + } + }, + "limit": 10 + } +} +``` + +--- diff --git a/src/Backend/opti-sql-go/substrait/operation.pb.go b/src/Backend/opti-sql-go/substrait/operation.pb.go index 00f49f2..09a7b52 100644 --- a/src/Backend/opti-sql-go/substrait/operation.pb.go +++ b/src/Backend/opti-sql-go/substrait/operation.pb.go @@ -1,17 +1,18 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 +// protoc-gen-go v1.36.11 // protoc v6.32.0 // source: operation.proto package substrait import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" unsafe "unsafe" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( @@ -84,13 +85,12 @@ func (ReturnTypes) EnumDescriptor() ([]byte, []int) { // The request message containing the operation details. type QueryExecutionRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - SubstraitLogical []byte `protobuf:"bytes,1,opt,name=substrait_logical,json=substraitLogical,proto3" json:"substrait_logical,omitempty"` //SS logical plan - SqlStatement string `protobuf:"bytes,2,opt,name=sql_statement,json=sqlStatement,proto3" json:"sql_statement,omitempty"` // original sql statement - Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` // unique id for this client - Source *SourceType `protobuf:"bytes,4,opt,name=source,proto3" json:"source,omitempty"` // (s3 link| base64 data) - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + LogicalPlan string `protobuf:"bytes,1,opt,name=logical_plan,json=logicalPlan,proto3" json:"logical_plan,omitempty"` // Substrait logical plan: serialized representation of the query execution (contains s3 link to the source data) + SqlStatement string `protobuf:"bytes,2,opt,name=sql_statement,json=sqlStatement,proto3" json:"sql_statement,omitempty"` // original sql statement + Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` // unique id for this client + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *QueryExecutionRequest) Reset() { @@ -123,11 +123,11 @@ func (*QueryExecutionRequest) Descriptor() ([]byte, []int) { return file_operation_proto_rawDescGZIP(), []int{0} } -func (x *QueryExecutionRequest) GetSubstraitLogical() []byte { +func (x *QueryExecutionRequest) GetLogicalPlan() string { if x != nil { - return x.SubstraitLogical + return x.LogicalPlan } - return nil + return "" } func (x *QueryExecutionRequest) GetSqlStatement() string { @@ -144,13 +144,6 @@ func (x *QueryExecutionRequest) GetId() string { return "" } -func (x *QueryExecutionRequest) GetSource() *SourceType { - if x != nil { - return x.Source - } - return nil -} - // The response message containing the result. type QueryExecutionResponse struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -204,58 +197,6 @@ func (x *QueryExecutionResponse) GetErrorType() *ErrorDetails { return nil } -type SourceType struct { - state protoimpl.MessageState `protogen:"open.v1"` - S3Source string `protobuf:"bytes,1,opt,name=s3_source,json=s3Source,proto3" json:"s3_source,omitempty"` // s3 link to the source data - Mime string `protobuf:"bytes,2,opt,name=mime,proto3" json:"mime,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *SourceType) Reset() { - *x = SourceType{} - mi := &file_operation_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *SourceType) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SourceType) ProtoMessage() {} - -func (x *SourceType) ProtoReflect() protoreflect.Message { - mi := &file_operation_proto_msgTypes[2] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SourceType.ProtoReflect.Descriptor instead. -func (*SourceType) Descriptor() ([]byte, []int) { - return file_operation_proto_rawDescGZIP(), []int{2} -} - -func (x *SourceType) GetS3Source() string { - if x != nil { - return x.S3Source - } - return "" -} - -func (x *SourceType) GetMime() string { - if x != nil { - return x.Mime - } - return "" -} - type ErrorDetails struct { state protoimpl.MessageState `protogen:"open.v1"` ErrorType ReturnTypes `protobuf:"varint,1,opt,name=error_type,json=errorType,proto3,enum=contract.ReturnTypes" json:"error_type,omitempty"` @@ -266,7 +207,7 @@ type ErrorDetails struct { func (x *ErrorDetails) Reset() { *x = ErrorDetails{} - mi := &file_operation_proto_msgTypes[3] + mi := &file_operation_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -278,7 +219,7 @@ func (x *ErrorDetails) String() string { func (*ErrorDetails) ProtoMessage() {} func (x *ErrorDetails) ProtoReflect() protoreflect.Message { - mi := &file_operation_proto_msgTypes[3] + mi := &file_operation_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -291,7 +232,7 @@ func (x *ErrorDetails) ProtoReflect() protoreflect.Message { // Deprecated: Use ErrorDetails.ProtoReflect.Descriptor instead. func (*ErrorDetails) Descriptor() ([]byte, []int) { - return file_operation_proto_rawDescGZIP(), []int{3} + return file_operation_proto_rawDescGZIP(), []int{2} } func (x *ErrorDetails) GetErrorType() ReturnTypes { @@ -312,20 +253,15 @@ var File_operation_proto protoreflect.FileDescriptor const file_operation_proto_rawDesc = "" + "\n" + - "\x0foperation.proto\x12\bcontract\"\xa7\x01\n" + - "\x15QueryExecutionRequest\x12+\n" + - "\x11substrait_logical\x18\x01 \x01(\fR\x10substraitLogical\x12#\n" + + "\x0foperation.proto\x12\bcontract\"o\n" + + "\x15QueryExecutionRequest\x12!\n" + + "\flogical_plan\x18\x01 \x01(\tR\vlogicalPlan\x12#\n" + "\rsql_statement\x18\x02 \x01(\tR\fsqlStatement\x12\x0e\n" + - "\x02id\x18\x03 \x01(\tR\x02id\x12,\n" + - "\x06source\x18\x04 \x01(\v2\x14.contract.SourceTypeR\x06source\"u\n" + + "\x02id\x18\x03 \x01(\tR\x02id\"u\n" + "\x16QueryExecutionResponse\x12$\n" + "\x0es3_result_link\x18\x01 \x01(\tR\fs3ResultLink\x125\n" + "\n" + - "error_type\x18\x02 \x01(\v2\x16.contract.ErrorDetailsR\terrorType\"=\n" + - "\n" + - "SourceType\x12\x1b\n" + - "\ts3_source\x18\x01 \x01(\tR\bs3Source\x12\x12\n" + - "\x04mime\x18\x02 \x01(\tR\x04mime\"^\n" + + "error_type\x18\x02 \x01(\v2\x16.contract.ErrorDetailsR\terrorType\"^\n" + "\fErrorDetails\x124\n" + "\n" + "error_type\x18\x01 \x01(\x0e2\x15.contract.returnTypesR\terrorType\x12\x18\n" + @@ -354,25 +290,23 @@ func file_operation_proto_rawDescGZIP() []byte { } var file_operation_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_operation_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_operation_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_operation_proto_goTypes = []any{ (ReturnTypes)(0), // 0: contract.returnTypes (*QueryExecutionRequest)(nil), // 1: contract.QueryExecutionRequest (*QueryExecutionResponse)(nil), // 2: contract.QueryExecutionResponse - (*SourceType)(nil), // 3: contract.SourceType - (*ErrorDetails)(nil), // 4: contract.ErrorDetails + (*ErrorDetails)(nil), // 3: contract.ErrorDetails } var file_operation_proto_depIdxs = []int32{ - 3, // 0: contract.QueryExecutionRequest.source:type_name -> contract.SourceType - 4, // 1: contract.QueryExecutionResponse.error_type:type_name -> contract.ErrorDetails - 0, // 2: contract.ErrorDetails.error_type:type_name -> contract.returnTypes - 1, // 3: contract.SSOperation.ExecuteQuery:input_type -> contract.QueryExecutionRequest - 2, // 4: contract.SSOperation.ExecuteQuery:output_type -> contract.QueryExecutionResponse - 4, // [4:5] is the sub-list for method output_type - 3, // [3:4] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 3, // 0: contract.QueryExecutionResponse.error_type:type_name -> contract.ErrorDetails + 0, // 1: contract.ErrorDetails.error_type:type_name -> contract.returnTypes + 1, // 2: contract.SSOperation.ExecuteQuery:input_type -> contract.QueryExecutionRequest + 2, // 3: contract.SSOperation.ExecuteQuery:output_type -> contract.QueryExecutionResponse + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_operation_proto_init() } @@ -386,7 +320,7 @@ func file_operation_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_operation_proto_rawDesc), len(file_operation_proto_rawDesc)), NumEnums: 1, - NumMessages: 4, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, diff --git a/src/Backend/opti-sql-go/substrait/operation_grpc.pb.go b/src/Backend/opti-sql-go/substrait/operation_grpc.pb.go index 3b87fab..cbe80d5 100644 --- a/src/Backend/opti-sql-go/substrait/operation_grpc.pb.go +++ b/src/Backend/opti-sql-go/substrait/operation_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.0 // - protoc v6.32.0 // source: operation.proto @@ -68,7 +68,7 @@ type SSOperationServer interface { type UnimplementedSSOperationServer struct{} func (UnimplementedSSOperationServer) ExecuteQuery(context.Context, *QueryExecutionRequest) (*QueryExecutionResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ExecuteQuery not implemented") + return nil, status.Error(codes.Unimplemented, "method ExecuteQuery not implemented") } func (UnimplementedSSOperationServer) mustEmbedUnimplementedSSOperationServer() {} func (UnimplementedSSOperationServer) testEmbeddedByValue() {} @@ -81,7 +81,7 @@ type UnsafeSSOperationServer interface { } func RegisterSSOperationServer(s grpc.ServiceRegistrar, srv SSOperationServer) { - // If the following call pancis, it indicates UnimplementedSSOperationServer was + // If the following call panics, it indicates UnimplementedSSOperationServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/src/Backend/opti-sql-go/substrait/server.go b/src/Backend/opti-sql-go/substrait/server.go index 5fe5107..8169f36 100644 --- a/src/Backend/opti-sql-go/substrait/server.go +++ b/src/Backend/opti-sql-go/substrait/server.go @@ -27,8 +27,7 @@ func newSubstraitServer(l *net.Listener) *SubstraitServer { // ExecuteQuery implements the gRPC service method func (s *SubstraitServer) ExecuteQuery(ctx context.Context, req *QueryExecutionRequest) (*QueryExecutionResponse, error) { - fmt.Printf("Received query request: logical_plan:%v\n sql:%s\n id:%v\n source: %v\n", req.SubstraitLogical, req.SqlStatement, req.Id, req.Source) - + fmt.Printf("Received query request: logical_plan:%v\n sql:%v\n id:%v\n", req.LogicalPlan, req.SqlStatement, req.Id) // Placeholder response return &QueryExecutionResponse{ S3ResultLink: "", @@ -78,4 +77,5 @@ func unifiedShutdownHandler(s *SubstraitServer, grpcServer *grpc.Server, stopCha grpcServer.GracefulStop() fmt.Println("Server shutdown complete") + os.Exit(1) } diff --git a/src/Backend/opti-sql-go/substrait/substrait.go b/src/Backend/opti-sql-go/substrait/substrait.go index 8030f02..e3edb41 100644 --- a/src/Backend/opti-sql-go/substrait/substrait.go +++ b/src/Backend/opti-sql-go/substrait/substrait.go @@ -4,4 +4,3 @@ package substrait // parse substrait plan to in memory format we can work with (abstract syntax tree) // optimize the plan (apply optimizations) // run operators based on the optimized plan -// diff --git a/src/Backend/opti-sql-go/substrait/substrait_test.go b/src/Backend/opti-sql-go/substrait/substrait_test.go index fe23790..4d5218e 100644 --- a/src/Backend/opti-sql-go/substrait/substrait_test.go +++ b/src/Backend/opti-sql-go/substrait/substrait_test.go @@ -29,13 +29,9 @@ func TestDummyInput(t *testing.T) { t.Errorf("Expected non-nil Substrait server") } dummyRequest := &QueryExecutionRequest{ - SqlStatement: "SELECT * FROM table", - SubstraitLogical: []byte("CgJTUxIMCgpTZWxlY3QgKiBGUk9NIHRhYmxl"), - Id: "GenerateDTMoneyOHaasdavdasvasdvada", - Source: &SourceType{ - S3Source: "s3://my-bucket/data/table.parquet", - Mime: "application/vnd.apache.parquet", - }, + SqlStatement: "SELECT * FROM table", + LogicalPlan: "CgJTUxIMCgpTZWxlY3QgKiBGUk9NIHRhYmxl", + Id: "GenerateDTMoneyOHaasdavdasvasdvada", } resp, err := ss.ExecuteQuery(context.Background(), dummyRequest) if err != nil { diff --git a/src/Contract/operation.proto b/src/Contract/operation.proto index 598386b..fdecf23 100644 --- a/src/Contract/operation.proto +++ b/src/Contract/operation.proto @@ -10,10 +10,10 @@ service SSOperation { // The request message containing the operation details. message QueryExecutionRequest { - bytes substrait_logical = 1; // Substrait logical plan: serialized representation of the query execution + // base64 encoded string of the logical plan (custom IR json format) + string logical_plan = 1; // Substrait logical plan: serialized representation of the query execution (contains s3 link to the source data) string sql_statement = 2; // original sql statement string id = 3; // unique id for this client - SourceType source = 4; // (s3 link| base64 data) } // The response message containing the result. @@ -22,10 +22,6 @@ message QueryExecutionResponse { ErrorDetails error_type = 2; // error type if any } -message SourceType{ - string s3_source = 1; // s3 link to the source data - string mime = 2; -} enum returnTypes{ SUCCESS = 0;