Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Backend/opti-sql-go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type serverConfig struct {
Host string `yaml:"host"`
Timeout int `yaml:"timeout"`
MaxRequestSizeMB uint64 `yaml:"max_request_size_mb"` // max size of a file upload. passed in by grpc request
RedisPort string `yaml:"redis_port"`
}
Comment thread
Rich-T-kid marked this conversation as resolved.
type batchConfig struct {
Size int `yaml:"size"`
Expand Down Expand Up @@ -68,6 +69,7 @@ var configInstance *Config = &Config{
Host: "0.0.0.0",
Timeout: 30,
MaxRequestSizeMB: 15,
RedisPort: "104.236.210.9",
Comment thread
Rich-T-kid marked this conversation as resolved.
},
Batch: batchConfig{
Size: 1024 * 8, // rows per bathch
Expand Down Expand Up @@ -151,6 +153,9 @@ func mergeConfig(dst *Config, src map[string]interface{}) {
if v, ok := server["max_request_size_mb"].(int); ok {
dst.Server.MaxRequestSizeMB = uint64(v)
}
if v, ok := server["redis_port"].(string); ok {
dst.Server.RedisPort = v
}
}

// =============================
Expand Down
3 changes: 3 additions & 0 deletions src/Backend/opti-sql-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.13 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.13 // indirect
github.com/aws/smithy-go v1.23.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-ini/ini v1.67.0 // indirect
github.com/go-jose/go-jose/v4 v4.1.3 // indirect
github.com/goccy/go-json v0.10.3 // indirect
Expand All @@ -41,6 +43,7 @@ require (
github.com/minio/minio-go v6.0.14+incompatible // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/redis/go-redis/v9 v9.17.3 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions src/Backend/opti-sql-go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.90.2 h1:DhdbtDl4FdNlj31+xiRXANxEE+eC7
github.com/aws/aws-sdk-go-v2/service/s3 v1.90.2/go.mod h1:+wArOOrcHUevqdto9k1tKOF5++YTe9JEcPSc9Tx2ZSw=
github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM=
github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-jose/go-jose/v4 v4.1.3 h1:CVLmWDhDVRa6Mi/IgCgaopNosCaHz7zrMeF9MlZRkrs=
Expand Down Expand Up @@ -70,6 +74,8 @@ github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.17.3 h1:fN29NdNrE17KttK5Ndf20buqfDZwGNgoUr9qjl1DQx4=
github.com/redis/go-redis/v9 v9.17.3/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
Expand Down
93 changes: 93 additions & 0 deletions src/Backend/opti-sql-go/substrait/GC.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package substrait

import (
"context"
"fmt"
"opti-sql-go/config"
"time"

"github.com/minio/minio-go"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
)

// garbage collection for removing files from s3 storage after expiration
var dontTouchTestFiles = []string{"country_full.csv", "userdata.parquet", "example.txt", "random_test"}

const ignoreFolder = "result-file-cache"
const loggerPrefix = "Garbage-Collection"
const waitTime = time.Second * 5

func garbageCollection() {
logger := config.GetLogger()
logger.Info(fmt.Sprintf("[%v]starting garbage collection, won't touch these files %v", loggerPrefix, dontTouchTestFiles))
config := config.GetConfig()
redisInstance := redis.NewClient(&redis.Options{
Addr: config.Server.RedisAddr + ":6379",
Password: "", // no password
DB: 0, // use default DB
Protocol: 2,
})
secretes := config.Secretes
accessKey := secretes.AccessKey
secretKey := secretes.SecretKey
endpoint := secretes.EndpointURL
bucket := secretes.BucketName
useSSL := true

client, err := minio.New(endpoint, accessKey, secretKey, useSSL)
if err != nil {
logger.Fatal("failed to construct s3 client to delete old files", zap.String("error message", fmt.Sprintf("%v", err)))
}
var failedAttempts = 0
for {
start:
if failedAttempts > 5 {
logger.Warn("removing files has failed over 5 times, check redis and s3 for issues !!!")
}
Comment thread
Rich-T-kid marked this conversation as resolved.
fmt.Printf("waiting %v minutes before check for files to clear from s3", waitTime.Minutes())
time.Sleep(waitTime)
start := time.Now()
entries, err := redisInstance.LRange(context.TODO(), ignoreFolder, 0, -1).Result()
if err != nil {
logger.Error(fmt.Sprintf("failed to read in files from %v", ignoreFolder), zap.Int("fail counter", failedAttempts))
failedAttempts++
goto start // try again
}
// read all the files in s3
doneChan := make(chan struct{})
readCount := 0
var nonValidFiles []string
validMap := buildMap(dontTouchTestFiles, entries)
for fileName := range client.ListObjects(bucket, "", true, doneChan) {
Comment thread
Rich-T-kid marked this conversation as resolved.
if !validMap[fileName.Key] {
nonValidFiles = append(nonValidFiles, fileName.Key)
}
readCount++
}
var removedFiles = 0
for _, invalidFile := range nonValidFiles {
err := client.RemoveObject(bucket, invalidFile)
if err != nil {
logger.Warn(fmt.Sprintf("error removing %v from s3: %v", invalidFile, err))
// log and move on
} else {
removedFiles++
}
}
failedAttempts = 0 // reset failed attempts back to zero
logger.Info("Garbage Collection metrics", zap.Any("to-keep map", validMap), zap.Int("total-files count", readCount), zap.Int("removed-files count", removedFiles), zap.Any("time-taken", time.Since(start)))

}

}
func buildMap(source1 []string, source2 []string) map[string]bool {
result := make(map[string]bool)
for _, k := range source1 {
result[k] = true
}
for _, k := range source2 {
result[k] = true
}
return result
}
2 changes: 1 addition & 1 deletion src/Backend/opti-sql-go/substrait/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ func Start() chan struct{} {
RegisterSSOperationServer(grpcServer, ss)

stopChan := make(chan struct{})

log.Printf("Substrait server listening on port %d", c.Server.Port)
go unifiedShutdownHandler(ss, grpcServer, stopChan)
go garbageCollection()
go func() {
if err := grpcServer.Serve(*ss.listener); err != nil {
log.Fatalf("Failed to serve: %v", err)
Expand Down