diff --git a/src/Backend/opti-sql-go/config/config.go b/src/Backend/opti-sql-go/config/config.go index 9802dab..fba8f7a 100644 --- a/src/Backend/opti-sql-go/config/config.go +++ b/src/Backend/opti-sql-go/config/config.go @@ -28,6 +28,7 @@ type serverConfig struct { Host string `yaml:"host"` Timeout int `yaml:"timeout"` MaxRequestSizeMB uint64 `yaml:"max_request_size_mb"` // max size of a file upload. passed in by grpc request + RedisPort string `yaml:"redis_port"` } type batchConfig struct { Size int `yaml:"size"` @@ -68,6 +69,7 @@ var configInstance *Config = &Config{ Host: "0.0.0.0", Timeout: 30, MaxRequestSizeMB: 15, + RedisPort: "104.236.210.9", }, Batch: batchConfig{ Size: 1024 * 8, // rows per bathch @@ -151,6 +153,9 @@ func mergeConfig(dst *Config, src map[string]interface{}) { if v, ok := server["max_request_size_mb"].(int); ok { dst.Server.MaxRequestSizeMB = uint64(v) } + if v, ok := server["redis_port"].(string); ok { + dst.Server.RedisPort = v + } } // ============================= diff --git a/src/Backend/opti-sql-go/go.mod b/src/Backend/opti-sql-go/go.mod index 05747a4..10bd61f 100644 --- a/src/Backend/opti-sql-go/go.mod +++ b/src/Backend/opti-sql-go/go.mod @@ -27,6 +27,8 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.13 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.13 // indirect github.com/aws/smithy-go v1.23.2 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-ini/ini v1.67.0 // indirect github.com/go-jose/go-jose/v4 v4.1.3 // indirect github.com/goccy/go-json v0.10.3 // indirect @@ -41,6 +43,7 @@ require ( github.com/minio/minio-go v6.0.14+incompatible // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/redis/go-redis/v9 v9.17.3 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.1 // indirect diff --git a/src/Backend/opti-sql-go/go.sum b/src/Backend/opti-sql-go/go.sum index 283f097..50406cf 100644 --- a/src/Backend/opti-sql-go/go.sum +++ b/src/Backend/opti-sql-go/go.sum @@ -32,9 +32,13 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.90.2 h1:DhdbtDl4FdNlj31+xiRXANxEE+eC7 github.com/aws/aws-sdk-go-v2/service/s3 v1.90.2/go.mod h1:+wArOOrcHUevqdto9k1tKOF5++YTe9JEcPSc9Tx2ZSw= github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM= github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-jose/go-jose/v4 v4.1.3 h1:CVLmWDhDVRa6Mi/IgCgaopNosCaHz7zrMeF9MlZRkrs= @@ -70,6 +74,8 @@ github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.17.3 h1:fN29NdNrE17KttK5Ndf20buqfDZwGNgoUr9qjl1DQx4= +github.com/redis/go-redis/v9 v9.17.3/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= diff --git a/src/Backend/opti-sql-go/substrait/GC.go b/src/Backend/opti-sql-go/substrait/GC.go new file mode 100644 index 0000000..a0cb0fe --- /dev/null +++ b/src/Backend/opti-sql-go/substrait/GC.go @@ -0,0 +1,93 @@ +package substrait + +import ( + "context" + "fmt" + "opti-sql-go/config" + "time" + + "github.com/minio/minio-go" + "github.com/redis/go-redis/v9" + "go.uber.org/zap" +) + +// garbage collection for removing files from s3 storage after expiration +var dontTouchTestFiles = []string{"country_full.csv", "userdata.parquet", "example.txt", "random_test"} + +const ignoreFolder = "result-file-cache" +const loggerPrefix = "Garbage-Collection" +const waitTime = time.Second * 5 + +func garbageCollection() { + logger := config.GetLogger() + logger.Info(fmt.Sprintf("[%v]starting garbage collection, won't touch these files %v", loggerPrefix, dontTouchTestFiles)) + config := config.GetConfig() + redisInstance := redis.NewClient(&redis.Options{ + Addr: config.Server.RedisAddr + ":6379", + Password: "", // no password + DB: 0, // use default DB + Protocol: 2, + }) + secretes := config.Secretes + accessKey := secretes.AccessKey + secretKey := secretes.SecretKey + endpoint := secretes.EndpointURL + bucket := secretes.BucketName + useSSL := true + + client, err := minio.New(endpoint, accessKey, secretKey, useSSL) + if err != nil { + logger.Fatal("failed to construct s3 client to delete old files", zap.String("error message", fmt.Sprintf("%v", err))) + } + var failedAttempts = 0 + for { + start: + if failedAttempts > 5 { + logger.Warn("removing files has failed over 5 times, check redis and s3 for issues !!!") + } + fmt.Printf("waiting %v minutes before check for files to clear from s3", waitTime.Minutes()) + time.Sleep(waitTime) + start := time.Now() + entries, err := redisInstance.LRange(context.TODO(), ignoreFolder, 0, -1).Result() + if err != nil { + logger.Error(fmt.Sprintf("failed to read in files from %v", ignoreFolder), zap.Int("fail counter", failedAttempts)) + failedAttempts++ + goto start // try again + } + // read all the files in s3 + doneChan := make(chan struct{}) + readCount := 0 + var nonValidFiles []string + validMap := buildMap(dontTouchTestFiles, entries) + for fileName := range client.ListObjects(bucket, "", true, doneChan) { + if !validMap[fileName.Key] { + nonValidFiles = append(nonValidFiles, fileName.Key) + } + readCount++ + } + var removedFiles = 0 + for _, invalidFile := range nonValidFiles { + err := client.RemoveObject(bucket, invalidFile) + if err != nil { + logger.Warn(fmt.Sprintf("error removing %v from s3: %v", invalidFile, err)) + // log and move on + } else { + removedFiles++ + } + } + failedAttempts = 0 // reset failed attempts back to zero + logger.Info("Garbage Collection metrics", zap.Any("to-keep map", validMap), zap.Int("total-files count", readCount), zap.Int("removed-files count", removedFiles), zap.Any("time-taken", time.Since(start))) + + } + +} +func buildMap(source1 []string, source2 []string) map[string]bool { + result := make(map[string]bool) + for _, k := range source1 { + result[k] = true + } + for _, k := range source2 { + result[k] = true + } + return result +} diff --git a/src/Backend/opti-sql-go/substrait/server.go b/src/Backend/opti-sql-go/substrait/server.go index 6ae1f42..159154d 100644 --- a/src/Backend/opti-sql-go/substrait/server.go +++ b/src/Backend/opti-sql-go/substrait/server.go @@ -181,9 +181,9 @@ func Start() chan struct{} { RegisterSSOperationServer(grpcServer, ss) stopChan := make(chan struct{}) - log.Printf("Substrait server listening on port %d", c.Server.Port) go unifiedShutdownHandler(ss, grpcServer, stopChan) + go garbageCollection() go func() { if err := grpcServer.Serve(*ss.listener); err != nil { log.Fatalf("Failed to serve: %v", err)