Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 4 additions & 331 deletions playground/backend/cmd/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,14 @@ func (controller *playgroundController) GetPrecompiledObjects(ctx context.Contex
bucket := cloud_bucket.New()
sdkToCategories, err := bucket.GetPrecompiledObjects(ctx, info.Sdk, info.Category)
if err != nil {
logger.Errorf("%s: GetPrecompiledObjects(): cloud storage error: %s", err.Error())
logger.Errorf("GetPrecompiledObjects(): cloud storage error: %s", err.Error())
return nil, errors.InternalError("GetPrecompiledObjects(): ", err.Error())
}
response := pb.GetPrecompiledObjectsResponse{SdkCategories: make([]*pb.Categories, 0)}
for sdkName, categories := range *sdkToCategories {
sdkCategory := pb.Categories{Sdk: pb.Sdk(pb.Sdk_value[sdkName]), Categories: make([]*pb.Categories_Category, 0)}
for categoryName, precompiledObjects := range categories {
PutPrecompiledObjectsToCategory(categoryName, &precompiledObjects, &sdkCategory)
utils.PutPrecompiledObjectsToCategory(categoryName, &precompiledObjects, &sdkCategory)
}
response.SdkCategories = append(response.SdkCategories, &sdkCategory)
}
Expand All @@ -179,7 +179,7 @@ func (controller *playgroundController) GetPrecompiledObjectCode(ctx context.Con
cd := cloud_bucket.New()
codeString, err := cd.GetPrecompiledObject(ctx, info.GetCloudPath())
if err != nil {
logger.Errorf("%s: GetPrecompiledObject(): cloud storage error: %s", err.Error())
logger.Errorf("GetPrecompiledObject(): cloud storage error: %s", err.Error())
return nil, errors.InternalError("GetPrecompiledObjects(): ", err.Error())
}
response := pb.GetPrecompiledObjectCodeResponse{Code: *codeString}
Expand All @@ -191,336 +191,9 @@ func (controller *playgroundController) GetPrecompiledObjectOutput(ctx context.C
cd := cloud_bucket.New()
output, err := cd.GetPrecompiledObjectOutput(ctx, info.GetCloudPath())
if err != nil {
logger.Errorf("%s: GetPrecompiledObjectOutput(): cloud storage error: %s", err.Error())
logger.Errorf("GetPrecompiledObjectOutput(): cloud storage error: %s", err.Error())
return nil, errors.InternalError("GetPrecompiledObjectOutput(): ", err.Error())
}
response := pb.GetRunOutputResponse{Output: *output}
return &response, nil
}

// PutPrecompiledObjectsToCategory adds categories with precompiled objects to protobuf object
func PutPrecompiledObjectsToCategory(categoryName string, precompiledObjects *cloud_bucket.PrecompiledObjects, sdkCategory *pb.Categories) {
category := pb.Categories_Category{
CategoryName: categoryName,
PrecompiledObjects: make([]*pb.PrecompiledObject, 0),
}
for _, object := range *precompiledObjects {
category.PrecompiledObjects = append(category.PrecompiledObjects, &pb.PrecompiledObject{
CloudPath: object.CloudPath,
Name: object.Name,
Description: object.Description,
Type: object.Type,
})
}
sdkCategory.Categories = append(sdkCategory.Categories, &category)
}

// setupLifeCycle creates fs_tool.LifeCycle and prepares files and folders needed to code processing
func setupLifeCycle(sdk pb.Sdk, code string, pipelineId uuid.UUID, workingDir string) (*fs_tool.LifeCycle, error) {
// create file system service
lc, err := fs_tool.NewLifeCycle(sdk, pipelineId, workingDir)
if err != nil {
logger.Errorf("%s: RunCode(): NewLifeCycle(): %s\n", pipelineId, err.Error())
return nil, err
}

// create folders
err = lc.CreateFolders()
if err != nil {
logger.Errorf("%s: RunCode(): CreateFolders(): %s\n", pipelineId, err.Error())
return nil, err
}

// create file with code
_, err = lc.CreateExecutableFile(code)
if err != nil {
logger.Errorf("%s: RunCode(): CreateExecutableFile(): %s\n", pipelineId, err.Error())
return nil, err
}
return lc, nil
}

// setupCompileBuilder returns executors.CompileBuilder with validators and compiler based on sdk
func setupCompileBuilder(lc *fs_tool.LifeCycle, sdk pb.Sdk, executorConfig *environment.ExecutorConfig) *executors.CompileBuilder {
filePath := lc.GetAbsoluteExecutableFilePath()
val := setupValidators(sdk, filePath)

compileBuilder := executors.NewExecutorBuilder().
WithValidator().
WithSdkValidators(val).
WithCompiler()

switch sdk {
case pb.Sdk_SDK_JAVA:
workingDir := lc.GetAbsoluteExecutableFilesFolderPath()

compileBuilder = compileBuilder.
WithCommand(executorConfig.CompileCmd).
WithArgs(executorConfig.CompileArgs).
WithFileName(filePath).
WithWorkingDir(workingDir)
}
return compileBuilder
}

// setupRunBuilder returns executors.RunBuilder based on sdk
func setupRunBuilder(pipelineId uuid.UUID, lc *fs_tool.LifeCycle, sdk pb.Sdk, env *environment.Environment, compileBuilder *executors.CompileBuilder) (*executors.RunBuilder, error) {
runBuilder := compileBuilder.
WithRunner().
WithCommand(env.BeamSdkEnvs.ExecutorConfig.RunCmd).
WithArgs(env.BeamSdkEnvs.ExecutorConfig.RunArgs).
WithWorkingDir(lc.GetAbsoluteExecutableFilesFolderPath())

switch sdk {
case pb.Sdk_SDK_JAVA:
className, err := lc.ExecutableName(pipelineId, env.ApplicationEnvs.WorkingDir())
if err != nil {
logger.Errorf("%s: get executable file name: %s\n", pipelineId, err.Error())
return nil, err
}

runBuilder = runBuilder.
WithClassName(className)
}
return runBuilder, nil
}

// setupValidators returns slice of validators.Validator based on sdk
func setupValidators(sdk pb.Sdk, filepath string) *[]validators.Validator {
var val *[]validators.Validator
switch sdk {
case pb.Sdk_SDK_JAVA:
val = validators.GetJavaValidators(filepath)
}
return val
}

// processCode validates, compiles and runs code by pipelineId.
// During each operation updates status of execution and saves it into cache:
// - In case of processing works more that timeout duration saves playground.Status_STATUS_RUN_TIMEOUT as cache.Status into cache.
// - In case of code processing has been canceled saves playground.Status_STATUS_CANCELED as cache.Status into cache.
// - In case of validation step is failed saves playground.Status_STATUS_VALIDATION_ERROR as cache.Status into cache.
// - In case of compile step is failed saves playground.Status_STATUS_COMPILE_ERROR as cache.Status and compile logs as cache.CompileOutput into cache.
// - In case of compile step is completed with no errors saves compile output as cache.CompileOutput into cache.
// - In case of run step is failed saves playground.Status_STATUS_RUN_ERROR as cache.Status and run logs as cache.RunError into cache.
// - In case of run step is completed with no errors saves playground.Status_STATUS_FINISHED as cache.Status and run output as cache.RunOutput into cache.
// At the end of this method deletes all created folders.
func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycle, compileBuilder *executors.CompileBuilder, pipelineId uuid.UUID, env *environment.Environment, sdk pb.Sdk) {
ctxWithTimeout, finishCtxFunc := context.WithTimeout(ctx, env.ApplicationEnvs.PipelineExecuteTimeout())
defer func(lc *fs_tool.LifeCycle) {
finishCtxFunc()
cleanUp(pipelineId, lc)
}(lc)

errorChannel := make(chan error, 1)
dataChannel := make(chan interface{}, 1)
successChannel := make(chan bool, 1)
cancelChannel := make(chan bool, 1)

go cancelCheck(ctxWithTimeout, pipelineId, cancelChannel, cacheService)

// build executor for validate and compile steps
exec := compileBuilder.Build()

// validate
logger.Infof("%s: Validate() ...\n", pipelineId)
validateFunc := exec.Validate()
go validateFunc(successChannel, errorChannel)

if !processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, errorChannel, pb.Status_STATUS_VALIDATION_ERROR, pb.Status_STATUS_PREPARING) {
return
}

// prepare
logger.Infof("%s: Prepare() ...\n", pipelineId)
prepareFunc := exec.Prepare()
go prepareFunc(successChannel, errorChannel)

if !processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, errorChannel, pb.Status_STATUS_PREPARATION_ERROR, pb.Status_STATUS_COMPILING) {
return
}

// compile
logger.Infof("%s: Compile() ...\n", pipelineId)
compileCmd := exec.Compile(ctxWithTimeout)
go func(successCh chan bool, errCh chan error, dataCh chan interface{}) {
// TODO separate stderr from stdout
data, err := compileCmd.CombinedOutput()
dataCh <- data
if err != nil {
errCh <- err
successCh <- false
} else {
successCh <- true
}
}(successChannel, errorChannel, dataChannel)

if !processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, dataChannel, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING) {
return
}

runBuilder, err := setupRunBuilder(pipelineId, lc, sdk, env, compileBuilder)
if err != nil {
logger.Errorf("%s: error during setup runBuilder: %s\n", pipelineId, err.Error())
setToCache(ctxWithTimeout, cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR)
return
}

// build executor for run step
exec = runBuilder.Build()

// run
logger.Infof("%s: Run() ...\n", pipelineId)
runCmd := exec.Run(ctxWithTimeout)
go func(successCh chan bool, errCh chan error, dataCh chan interface{}) {
// TODO separate stderr from stdout
data, err := runCmd.CombinedOutput()
dataCh <- data
if err != nil {
errCh <- err
successChannel <- false
} else {
successChannel <- true
}
}(successChannel, errorChannel, dataChannel)

processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
}

// processStep processes each executor's step with cancel and timeout checks.
// If finishes by canceling, timeout or error - returns false.
// If finishes successfully returns true.
func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChannel, successChannel chan bool, dataChannel chan interface{}, errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
select {
case <-ctx.Done():
finishByTimeout(ctx, pipelineId, cacheService)
return false
case <-cancelChannel:
processCancel(ctx, cacheService, pipelineId)
return false
case ok := <-successChannel:
var data []byte = nil
if dataChannel != nil {
temp := <-dataChannel
data = temp.([]byte)
}
if !ok {
err := <-errorChannel
processError(ctx, err, data, pipelineId, cacheService, errorCaseStatus)
return false
}
processSuccess(ctx, data, pipelineId, cacheService, successCaseStatus)
}
return true
}

// finishByTimeout is used in case of runCode method finished by timeout
func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) {
logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)

// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
}

// cancelCheck checks cancel flag for code processing.
// If cancel flag doesn't exist in cache continue working.
// If context is done it means that code processing was finished (successfully/with error/timeout). Return.
// If cancel flag exists, and it is true it means that code processing was canceled. Set true to cancelChannel and return.
func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChannel chan bool, cacheService cache.Cache) {
ticker := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case _ = <-ticker.C:
cancel, err := cacheService.GetValue(ctx, pipelineId, cache.Canceled)
if err != nil {
continue
}
if cancel.(bool) {
cancelChannel <- true
}
return
}
}
}

// cleanUp removes all prepared folders for received LifeCycle
func cleanUp(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) {
logger.Infof("%s: DeleteFolders() ...\n", pipelineId)
if err := lc.DeleteFolders(); err != nil {
logger.Error("%s: DeleteFolders(): %s\n", pipelineId, err.Error())
}
logger.Infof("%s: DeleteFolders() complete\n", pipelineId)
logger.Infof("%s: complete\n", pipelineId)
}

// processError processes error received during processing code via setting a corresponding status and output to cache
func processError(ctx context.Context, err error, data []byte, pipelineId uuid.UUID, cacheService cache.Cache, status pb.Status) {
switch status {
case pb.Status_STATUS_VALIDATION_ERROR:
logger.Errorf("%s: Validate(): %s\n", pipelineId, err.Error())

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_VALIDATION_ERROR)
case pb.Status_STATUS_PREPARATION_ERROR:
logger.Errorf("%s: Prepare(): %s\n", pipelineId, err.Error())

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_PREPARATION_ERROR)
case pb.Status_STATUS_COMPILE_ERROR:
logger.Errorf("%s: Compile(): err: %s, output: %s\n", pipelineId, err.Error(), data)

setToCache(ctx, cacheService, pipelineId, cache.CompileOutput, "error: "+err.Error()+", output: "+string(data))

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_COMPILE_ERROR)
case pb.Status_STATUS_RUN_ERROR:
logger.Errorf("%s: Run(): err: %s, output: %s\n", pipelineId, err.Error(), data)

setToCache(ctx, cacheService, pipelineId, cache.RunError, "error: "+err.Error()+", output: "+string(data))

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_ERROR)
}
}

// processSuccess processes case after successful code processing via setting a corresponding status and output to cache
func processSuccess(ctx context.Context, output []byte, pipelineId uuid.UUID, cacheService cache.Cache, status pb.Status) {
switch status {
case pb.Status_STATUS_PREPARING:
logger.Infof("%s: Validate() finish\n", pipelineId)

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_PREPARING)
case pb.Status_STATUS_COMPILING:
logger.Infof("%s: Prepare() finish\n", pipelineId)

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_COMPILING)
case pb.Status_STATUS_EXECUTING:
logger.Infof("%s: Compile() finish\n", pipelineId)

setToCache(ctx, cacheService, pipelineId, cache.CompileOutput, string(output))

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_EXECUTING)
case pb.Status_STATUS_FINISHED:
logger.Infof("%s: Run() finish\n", pipelineId)

setToCache(ctx, cacheService, pipelineId, cache.RunOutput, string(output))

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_FINISHED)
}
}

// processCancel process case when code processing was canceled
func processCancel(ctx context.Context, cacheService cache.Cache, pipelineId uuid.UUID) {
logger.Infof("%s: was canceled\n", pipelineId)

// set to cache pipelineId: cache.SubKey_Status: pb.Status_STATUS_CANCELED
setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_CANCELED)
}

// setToCache puts value to cache by key and subKey
func setToCache(ctx context.Context, cacheService cache.Cache, key uuid.UUID, subKey cache.SubKey, value interface{}) error {
err := cacheService.SetValue(ctx, key, subKey, value)
if err != nil {
logger.Errorf("%s: cache.SetValue: %s\n", key, err.Error())
}
return err
}
Loading