Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 137 additions & 106 deletions playground/backend/internal/code_processing/code_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"io"
"os"
"os/exec"
"path/filepath"
"reflect"
"sync"
"time"
Expand All @@ -61,117 +60,62 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
DeleteFolders(pipelineId, lc)
}(lc)

errorChannel := make(chan error, 1)
successChannel := make(chan bool, 1)
cancelChannel := make(chan bool, 1)
stopReadLogsChannel := make(chan bool, 1)
finishReadLogsChannel := make(chan bool, 1)

var validationResults sync.Map

go cancelCheck(pipelineLifeCycleCtx, pipelineId, cancelChannel, cacheService)

executorBuilder, err := builder.SetupExecutorBuilder(lc.Paths, utils.ReduceWhiteSpacesToSinge(pipelineOptions), sdkEnv)
if err != nil {
_ = processSetupError(err, pipelineId, cacheService, pipelineLifeCycleCtx)
executor := validateStep(ctx, cacheService, &lc.Paths, pipelineId, sdkEnv, pipelineLifeCycleCtx, &validationResults, cancelChannel)
if executor == nil {
return
}
executor := executorBuilder.Build()
// Validate
logger.Infof("%s: Validate() ...\n", pipelineId)
validateFunc := executor.Validate()
// Run validate function
go validateFunc(successChannel, errorChannel, &validationResults)

// Start of the monitoring of background tasks (validate function/cancellation/timeout)
ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
if err != nil {
return
}
if !ok {
// Validate step is finished, but code isn't valid
err := <-errorChannel
_ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, []byte(err.Error()), pipelineId, cache.ValidationOutput, cacheService, "Validate", pb.Status_STATUS_VALIDATION_ERROR)
return
}
// Validate step is finished and code is valid
if err := processSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, "Validate", pb.Status_STATUS_PREPARING); err != nil {
executor = prepareStep(ctx, cacheService, &lc.Paths, pipelineId, sdkEnv, pipelineLifeCycleCtx, &validationResults, cancelChannel)
if executor == nil {
return
}

// Prepare
logger.Infof("%s: Prepare() ...\n", pipelineId)
prepareFunc := executor.Prepare()
// Run prepare function
go prepareFunc(successChannel, errorChannel, &validationResults)
// Check if is unit test
validateIsUnitTest, _ := validationResults.Load(validators.UnitTestValidatorName)
isUnitTest := validateIsUnitTest.(bool)

// Start of the monitoring of background tasks (prepare function/cancellation/timeout)
ok, err = reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
if err != nil {
return
}
if !ok {
// Prepare step is finished, but code couldn't be prepared (some error during prepare step)
err := <-errorChannel
_ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, []byte(err.Error()), pipelineId, cache.PreparationOutput, cacheService, "Prepare", pb.Status_STATUS_PREPARATION_ERROR)
return
}
// Prepare step is finished and code is prepared
if err := processSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, "Prepare", pb.Status_STATUS_COMPILING); err != nil {
executor = compileStep(ctx, cacheService, &lc.Paths, pipelineId, sdkEnv, isUnitTest, pipelineLifeCycleCtx, cancelChannel)
if executor == nil {
return
}

// Check if unit test
validateIsUnitTest, _ := validationResults.Load(validators.UnitTestValidatorName)
isUnitTest := validateIsUnitTest.(bool)
// Run/RunTest
runStep(ctx, cacheService, &lc.Paths, pipelineId, isUnitTest, sdkEnv, pipelineOptions, pipelineLifeCycleCtx, cancelChannel)
}

// This condition is used for cases when the playground doesn't compile source files. For the Python code and the Go Unit Tests
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_PYTHON || (sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO && isUnitTest) {
if err := processCompileSuccess(pipelineLifeCycleCtx, []byte(""), pipelineId, cacheService); err != nil {
return
}
} else { // in case of Java, Go (not unit test), Scala - need compile step
// Compile
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
executor = executorBuilder.WithCompiler().
WithFileName(builder.GetFileNameFromFolder(lc.Paths.AbsoluteSourceFileFolderPath, filepath.Ext(lc.Paths.SourceFileName))).Build() // Need changed name for unit tests
}
logger.Infof("%s: Compile() ...\n", pipelineId)
compileCmd := executor.Compile(pipelineLifeCycleCtx)
var compileError bytes.Buffer
var compileOutput bytes.Buffer
runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, isUnitTest bool, sdkEnv *environment.BeamEnvs, pipelineOptions string, pipelineLifeCycleCtx context.Context, cancelChannel chan bool) {
errorChannel, successChannel := createStatusChannels()
stopReadLogsChannel := make(chan bool, 1)
finishReadLogsChannel := make(chan bool, 1)

// Start of the monitoring of background tasks (compile step/cancellation/timeout)
ok, err = reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
if err != nil {
return
}
if !ok { // Compile step is finished, but code couldn't be compiled (some typos for example)
err := <-errorChannel
_ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, compileError.Bytes(), pipelineId, cache.CompileOutput, cacheService, "Compile", pb.Status_STATUS_COMPILE_ERROR)
return
} // Compile step is finished and code is compiled
if err := processCompileSuccess(pipelineLifeCycleCtx, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
return
}
executorBuilder := executors.NewExecutorBuilder()
err := error(nil)
if isUnitTest {
executorBuilder, err = builder.TestRunner(paths, sdkEnv)
} else {
executorBuilder, err = builder.Runner(paths, utils.ReduceWhiteSpacesToSinge(pipelineOptions), sdkEnv)
}

// Run
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
executor, err = setJavaExecutableFile(lc.Paths, pipelineId, cacheService, pipelineLifeCycleCtx, executorBuilder, filepath.Join(appEnv.WorkingDir(), appEnv.PipelinesFolder()))
if err != nil {
return
}
if err != nil {
_ = processSetupError(err, pipelineId, cacheService, pipelineLifeCycleCtx)
return
}
logger.Infof("%s: Run() ...\n", pipelineId)
runCmd := getExecuteCmd(&validationResults, &executor, pipelineLifeCycleCtx)

executor := executorBuilder.Build()
logger.Infof("%s: Run()/Test() ...\n", pipelineId)
runCmd := getExecuteCmd(isUnitTest, &executor, pipelineLifeCycleCtx)
var runError bytes.Buffer
runOutput := streaming.RunOutputWriter{Ctx: pipelineLifeCycleCtx, CacheService: cacheService, PipelineId: pipelineId}
go readLogFile(pipelineLifeCycleCtx, ctx, cacheService, lc.Paths.AbsoluteLogFilePath, pipelineId, stopReadLogsChannel, finishReadLogsChannel)
go readLogFile(pipelineLifeCycleCtx, ctx, cacheService, paths.AbsoluteLogFilePath, pipelineId, stopReadLogsChannel, finishReadLogsChannel)

if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
// For go SDK all logs are placed to stdErr.
file, err := os.Create(lc.Paths.AbsoluteLogFilePath)
file, err := os.Create(paths.AbsoluteLogFilePath)
if err != nil {
// If some error with creating a log file do the same as with other SDK.
logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
Expand All @@ -186,7 +130,7 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
}

// Start of the monitoring of background tasks (run step/cancellation/timeout)
ok, err = reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
if err != nil {
return
}
Expand All @@ -202,7 +146,7 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
// Run step is finished, but code contains some error (divide by 0 for example)
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
// For Go SDK stdErr was redirected to the log file.
errData, err := os.ReadFile(lc.Paths.AbsoluteLogFilePath)
errData, err := os.ReadFile(paths.AbsoluteLogFilePath)
if err != nil {
logger.Errorf("%s: error during read errors from log file (go sdk): %s", pipelineId, err.Error())
}
Expand All @@ -215,30 +159,117 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
_ = processRunSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, stopReadLogsChannel, finishReadLogsChannel)
}

func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, isUnitTest bool, pipelineLifeCycleCtx context.Context, cancelChannel chan bool) *executors.Executor {
errorChannel, successChannel := createStatusChannels()
var executor = executors.Executor{}
// This condition is used for cases when the playground doesn't compile source files. For the Python code and the Go Unit Tests
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_PYTHON || (sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO && isUnitTest) {
if err := processCompileSuccess(pipelineLifeCycleCtx, []byte(""), pipelineId, cacheService); err != nil {
return nil
}
} else { // in case of Java, Go (not unit test), Scala - need compile step
executorBuilder := builder.Compiler(paths, sdkEnv)
executor := executorBuilder.Build()
logger.Infof("%s: Compile() ...\n", pipelineId)
compileCmd := executor.Compile(pipelineLifeCycleCtx)
var compileError bytes.Buffer
var compileOutput bytes.Buffer
runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)

// Start of the monitoring of background tasks (compile step/cancellation/timeout)
ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
if err != nil {
return nil
}
if !ok { // Compile step is finished, but code couldn't be compiled (some typos for example)
err := <-errorChannel
_ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, compileError.Bytes(), pipelineId, cache.CompileOutput, cacheService, "Compile", pb.Status_STATUS_COMPILE_ERROR)
return nil
} // Compile step is finished and code is compiled
if err := processCompileSuccess(pipelineLifeCycleCtx, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
return nil
}
}
return &executor
}

func prepareStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, pipelineLifeCycleCtx context.Context, validationResults *sync.Map, cancelChannel chan bool) *executors.Executor {
errorChannel, successChannel := createStatusChannels()
executorBuilder, err := builder.Preparer(paths, sdkEnv, validationResults)
if err != nil {
_ = processSetupError(err, pipelineId, cacheService, pipelineLifeCycleCtx)
return nil
}
executor := executorBuilder.Build()
logger.Infof("%s: Prepare() ...\n", pipelineId)
prepareFunc := executor.Prepare()
go prepareFunc(successChannel, errorChannel, validationResults)

// Start of the monitoring of background tasks (prepare function/cancellation/timeout)
ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
if err != nil {
return nil
}
if !ok {
err := <-errorChannel
// Prepare step is finished, but code couldn't be prepared (some error during prepare step)
_ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, []byte(err.Error()), pipelineId, cache.PreparationOutput, cacheService, "Prepare", pb.Status_STATUS_PREPARATION_ERROR)
return nil
}
// Prepare step is finished and code is prepared
if err := processSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, "Prepare", pb.Status_STATUS_COMPILING); err != nil {
return nil
}
return &executor
}

func validateStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, pipelineLifeCycleCtx context.Context, validationResults *sync.Map, cancelChannel chan bool) *executors.Executor {
errorChannel, successChannel := createStatusChannels()
executorBuilder, err := builder.Validator(paths, sdkEnv)
if err != nil {
_ = processSetupError(err, pipelineId, cacheService, pipelineLifeCycleCtx)
return nil
}
executor := executorBuilder.Build()
logger.Infof("%s: Validate() ...\n", pipelineId)
validateFunc := executor.Validate()
go validateFunc(successChannel, errorChannel, validationResults)

// Start of the monitoring of background tasks (validate function/cancellation/timeout)
ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
if err != nil {
return nil
}
if !ok {
err := <-errorChannel
// Validate step is finished, but code isn't valid
_ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, []byte(err.Error()), pipelineId, cache.ValidationOutput, cacheService, "Validate", pb.Status_STATUS_VALIDATION_ERROR)
return nil
}

// Validate step is finished and code is valid
if err := processSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, "Validate", pb.Status_STATUS_PREPARING); err != nil {
return nil
}
return &executor
}

func createStatusChannels() (chan error, chan bool) {
errorChannel := make(chan error, 1)
successChannel := make(chan bool, 1)
return errorChannel, successChannel
}

// getExecuteCmd return cmd instance based on the code type: unit test or example code
func getExecuteCmd(valRes *sync.Map, executor *executors.Executor, ctxWithTimeout context.Context) *exec.Cmd {
isUnitTest, ok := valRes.Load(validators.UnitTestValidatorName)
func getExecuteCmd(isUnitTest bool, executor *executors.Executor, ctxWithTimeout context.Context) *exec.Cmd {
runType := executors.Run
if ok && isUnitTest.(bool) {
if isUnitTest {
runType = executors.Test
}
cmdReflect := reflect.ValueOf(executor).MethodByName(string(runType)).Call([]reflect.Value{reflect.ValueOf(ctxWithTimeout)})
return cmdReflect[0].Interface().(*exec.Cmd)
}

// setJavaExecutableFile sets executable file name to runner (JAVA class name is known after compilation step)
func setJavaExecutableFile(paths fs_tool.LifeCyclePaths, id uuid.UUID, service cache.Cache, ctx context.Context, executorBuilder *executors.ExecutorBuilder, dir string) (executors.Executor, error) {
className, err := paths.ExecutableName(id, dir)
if err != nil {
if err = processSetupError(err, id, service, ctx); err != nil {
return executorBuilder.Build(), err
}
}
return executorBuilder.
WithExecutableFileName(className).
Build(), nil
}

// processSetupError processes errors during the setting up an executor builder
func processSetupError(err error, pipelineId uuid.UUID, cacheService cache.Cache, ctxWithTimeout context.Context) error {
logger.Errorf("%s: error during setup builder: %s\n", pipelineId, err.Error())
Expand Down
Loading