diff --git a/playground/backend/configs/SDK_PYTHON.json b/playground/backend/configs/SDK_PYTHON.json index bb6ae89adff5..d1f89d366a52 100644 --- a/playground/backend/configs/SDK_PYTHON.json +++ b/playground/backend/configs/SDK_PYTHON.json @@ -1,6 +1,8 @@ { "compile_cmd": "", "run_cmd": "python3", + "test_cmd": "pytest", "compile_args": [], - "run_args": [] + "run_args": [], + "test_args": [] } diff --git a/playground/backend/internal/code_processing/code_processing.go b/playground/backend/internal/code_processing/code_processing.go index ea7e6114b022..0b4548ec642a 100644 --- a/playground/backend/internal/code_processing/code_processing.go +++ b/playground/backend/internal/code_processing/code_processing.go @@ -123,37 +123,28 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl return } - switch sdkEnv.ApacheBeamSdk { - case pb.Sdk_SDK_JAVA, pb.Sdk_SDK_GO: - if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO && isUnitTest { - if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil { - return - } - } else { - // Compile - logger.Infof("%s: Compile() ...\n", pipelineId) - compileCmd := executor.Compile(ctxWithTimeout) - var compileError bytes.Buffer - var compileOutput bytes.Buffer - runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel) - - // Start of the monitoring of background tasks (compile step/cancellation/timeout) + if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_PYTHON || (sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO && isUnitTest) { + if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil { + return + } + } else { // in case of Java, Go (not unit test), Scala - need compile step + // Compile + logger.Infof("%s: Compile() ...\n", pipelineId) + compileCmd := executor.Compile(ctxWithTimeout) + var compileError bytes.Buffer + var compileOutput bytes.Buffer + runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel) + + // Start of the monitoring of background tasks (compile step/cancellation/timeout) ok, err = reconcileBackgroundTask(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel) - if err != nil { - return - } - if !ok { - // Compile step is finished, but code couldn't be compiled (some typos for example) - _ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService) - return - } - // Compile step is finished and code is compiled - if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil { - return - } + if err != nil { + return } - case pb.Sdk_SDK_PYTHON: - if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil { + if !ok {// Compile step is finished, but code couldn't be compiled (some typos for example) + _ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService) + return + }// Compile step is finished and code is compiled + if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil { return } } @@ -193,6 +184,9 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl return } if !ok { + if runOutput.Error != nil { + runError.Write([]byte(runOutput.Error.Error())) + } // Run step is finished, but code contains some error (divide by 0 for example) if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO { // For Go SDK stdErr was redirected to the log file. diff --git a/playground/backend/internal/setup_tools/builder/setup_builder.go b/playground/backend/internal/setup_tools/builder/setup_builder.go index 7d3852a74afb..b461d799ec75 100644 --- a/playground/backend/internal/setup_tools/builder/setup_builder.go +++ b/playground/backend/internal/setup_tools/builder/setup_builder.go @@ -86,7 +86,7 @@ func SetupExecutorBuilder(lc *fs_tool.LifeCycle, pipelineOptions string, sdkEnv WithRunner(). WithCommand(lc.GetAbsoluteExecutableFilePath()).ExecutorBuilder case pb.Sdk_SDK_PYTHON: - // Nothing is needed for Python + builder = *builder.WithExecutableFileName(lc.GetAbsoluteExecutableFilePath()) case pb.Sdk_SDK_SCIO: return nil, fmt.Errorf("SCIO is not supported yet") default: diff --git a/playground/backend/internal/streaming/run_output_writer.go b/playground/backend/internal/streaming/run_output_writer.go index 94bd0a119f8a..36b129371733 100644 --- a/playground/backend/internal/streaming/run_output_writer.go +++ b/playground/backend/internal/streaming/run_output_writer.go @@ -27,6 +27,7 @@ type RunOutputWriter struct { Ctx context.Context CacheService cache.Cache PipelineId uuid.UUID + Error error } // Write writes len(p) bytes from p to cache with cache.RunOutput subKey. @@ -45,6 +46,7 @@ func (row *RunOutputWriter) Write(p []byte) (int, error) { prevOutput, err := row.CacheService.GetValue(row.Ctx, row.PipelineId, cache.RunOutput) if err != nil { + row.Error = err return 0, err } @@ -54,6 +56,7 @@ func (row *RunOutputWriter) Write(p []byte) (int, error) { // set new cache value err = row.CacheService.SetValue(row.Ctx, row.PipelineId, cache.RunOutput, str) if err != nil { + row.Error = err return 0, err } return len(p), nil diff --git a/playground/backend/internal/utils/validators_utils.go b/playground/backend/internal/utils/validators_utils.go index 539ac9496391..05e402c31335 100644 --- a/playground/backend/internal/utils/validators_utils.go +++ b/playground/backend/internal/utils/validators_utils.go @@ -30,7 +30,7 @@ func GetValidators(sdk pb.Sdk, filepath string) (*[]validators.Validator, error) case pb.Sdk_SDK_GO: val = validators.GetGoValidators(filepath) case pb.Sdk_SDK_PYTHON: - val = validators.GetPythonValidators() + val = validators.GetPyValidators(filepath) default: return nil, fmt.Errorf("incorrect sdk: %s", sdk) } diff --git a/playground/backend/internal/validators/python_validators.go b/playground/backend/internal/validators/python_validators.go index 4d997337df78..c0628ce937d0 100644 --- a/playground/backend/internal/validators/python_validators.go +++ b/playground/backend/internal/validators/python_validators.go @@ -15,8 +15,34 @@ package validators -// GetPythonValidators return validators methods that should be applied to Go code -func GetPythonValidators() *[]Validator { - //TODO: Will be added in task [BEAM-13292] - return &[]Validator{} +import ( + "beam.apache.org/playground/backend/internal/logger" + "io/ioutil" + "strings" +) + +const pyUnitTestPattern = "import unittest" + +// GetPyValidators return validators methods that should be applied to Python code +func GetPyValidators(filePath string) *[]Validator { + validatorArgs := make([]interface{}, 1) + validatorArgs[0] = filePath + unitTestValidator := Validator{ + Validator: CheckIsUnitTestPy, + Args: validatorArgs, + Name: UnitTestValidatorName, + } + validators := []Validator{unitTestValidator} + return &validators +} + +func CheckIsUnitTestPy(args ...interface{}) (bool, error) { + filePath := args[0].(string) + code, err := ioutil.ReadFile(filePath) + if err != nil { + logger.Errorf("Validation: Error during open file: %s, err: %s\n", filePath, err.Error()) + return false, err + } + // check whether Python code is unit test code + return strings.Contains(string(code), pyUnitTestPattern), nil }