Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion playground/backend/configs/SDK_PYTHON.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"compile_cmd": "",
"run_cmd": "python3",
"test_cmd": "pytest",
"compile_args": [],
"run_args": []
"run_args": [],
"test_args": []
}
52 changes: 23 additions & 29 deletions playground/backend/internal/code_processing/code_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,37 +123,28 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
return
}

switch sdkEnv.ApacheBeamSdk {
case pb.Sdk_SDK_JAVA, pb.Sdk_SDK_GO:
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO && isUnitTest {
if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
return
}
} else {
// Compile
logger.Infof("%s: Compile() ...\n", pipelineId)
compileCmd := executor.Compile(ctxWithTimeout)
var compileError bytes.Buffer
var compileOutput bytes.Buffer
runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)

// Start of the monitoring of background tasks (compile step/cancellation/timeout)
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_PYTHON || (sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO && isUnitTest) {
if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
return
}
} else { // in case of Java, Go (not unit test), Scala - need compile step
// Compile
logger.Infof("%s: Compile() ...\n", pipelineId)
compileCmd := executor.Compile(ctxWithTimeout)
var compileError bytes.Buffer
var compileOutput bytes.Buffer
runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)

// Start of the monitoring of background tasks (compile step/cancellation/timeout)
ok, err = reconcileBackgroundTask(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
if err != nil {
return
}
if !ok {
// Compile step is finished, but code couldn't be compiled (some typos for example)
_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
return
}
// Compile step is finished and code is compiled
if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
return
}
if err != nil {
return
}
case pb.Sdk_SDK_PYTHON:
if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
if !ok {// Compile step is finished, but code couldn't be compiled (some typos for example)
_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
return
}// Compile step is finished and code is compiled
if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
return
}
}
Expand Down Expand Up @@ -193,6 +184,9 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
return
}
if !ok {
if runOutput.Error != nil {
runError.Write([]byte(runOutput.Error.Error()))
}
// Run step is finished, but code contains some error (divide by 0 for example)
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
// For Go SDK stdErr was redirected to the log file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func SetupExecutorBuilder(lc *fs_tool.LifeCycle, pipelineOptions string, sdkEnv
WithRunner().
WithCommand(lc.GetAbsoluteExecutableFilePath()).ExecutorBuilder
case pb.Sdk_SDK_PYTHON:
// Nothing is needed for Python
builder = *builder.WithExecutableFileName(lc.GetAbsoluteExecutableFilePath())
case pb.Sdk_SDK_SCIO:
return nil, fmt.Errorf("SCIO is not supported yet")
default:
Expand Down
3 changes: 3 additions & 0 deletions playground/backend/internal/streaming/run_output_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type RunOutputWriter struct {
Ctx context.Context
CacheService cache.Cache
PipelineId uuid.UUID
Error error
}

// Write writes len(p) bytes from p to cache with cache.RunOutput subKey.
Expand All @@ -45,6 +46,7 @@ func (row *RunOutputWriter) Write(p []byte) (int, error) {

prevOutput, err := row.CacheService.GetValue(row.Ctx, row.PipelineId, cache.RunOutput)
if err != nil {
row.Error = err
return 0, err
}

Expand All @@ -54,6 +56,7 @@ func (row *RunOutputWriter) Write(p []byte) (int, error) {
// set new cache value
err = row.CacheService.SetValue(row.Ctx, row.PipelineId, cache.RunOutput, str)
if err != nil {
row.Error = err
return 0, err
}
return len(p), nil
Expand Down
2 changes: 1 addition & 1 deletion playground/backend/internal/utils/validators_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func GetValidators(sdk pb.Sdk, filepath string) (*[]validators.Validator, error)
case pb.Sdk_SDK_GO:
val = validators.GetGoValidators(filepath)
case pb.Sdk_SDK_PYTHON:
val = validators.GetPythonValidators()
val = validators.GetPyValidators(filepath)
default:
return nil, fmt.Errorf("incorrect sdk: %s", sdk)
}
Expand Down
34 changes: 30 additions & 4 deletions playground/backend/internal/validators/python_validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,34 @@

package validators

// GetPythonValidators return validators methods that should be applied to Go code
func GetPythonValidators() *[]Validator {
//TODO: Will be added in task [BEAM-13292]
return &[]Validator{}
import (
"beam.apache.org/playground/backend/internal/logger"
"io/ioutil"
"strings"
)

const pyUnitTestPattern = "import unittest"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another option is (unittest.TestCase) .. though I recognize this is hard.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we file a bug to make sure that this works fine? users may accidentaly import unittest without adding an actual test? (maybe?) - perhaps the frontend should show the user that import unittest will make the playground run unit tests.


// GetPyValidators return validators methods that should be applied to Python code
func GetPyValidators(filePath string) *[]Validator {
validatorArgs := make([]interface{}, 1)
validatorArgs[0] = filePath
unitTestValidator := Validator{
Validator: CheckIsUnitTestPy,
Args: validatorArgs,
Name: UnitTestValidatorName,
}
validators := []Validator{unitTestValidator}
return &validators
}

func CheckIsUnitTestPy(args ...interface{}) (bool, error) {
filePath := args[0].(string)
code, err := ioutil.ReadFile(filePath)
if err != nil {
logger.Errorf("Validation: Error during open file: %s, err: %s\n", filePath, err.Error())
return false, err
}
// check whether Python code is unit test code
return strings.Contains(string(code), pyUnitTestPattern), nil
}