diff --git a/playground/README.md b/playground/README.md index 958e58ce1d7f..2a4c777f03c1 100644 --- a/playground/README.md +++ b/playground/README.md @@ -35,11 +35,44 @@ The following requirements are needed for development, testing, and deploying. - [Docker Compose](https://docs.docker.com/compose/install/) - [gcloud CLI](https://cloud.google.com/sdk/docs/install) - [gcloud Beta Commands](https://cloud.google.com/sdk/gcloud/reference/components/install) -- [Cloud Datastore Emulator](https://cloud.google.com/sdk/gcloud/reference/components/install) +- [Cloud Datastore Emulator](https://cloud.google.com/datastore/docs/tools/datastore-emulator) +- [sbt](https://www.scala-sbt.org/) + +### Google Cloud Shell Prerequisites Installation +Google Cloud Shell already has most of the prerequisites installed. Only few tools need to be installed separately + +#### Flutter +```shell +git config --global --add safe.directory /google/flutter +flutter doctor +``` + +#### Protobuf +```shell +go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28 +go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2 +dart pub global activate protoc_plugin +npm install -g @bufbuild/buf +``` +#### sbt +```shell +echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list +echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list +curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import +sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg +sudo apt-get update +sudo apt-get install sbt +``` +### Additional tools +Google Cloud shell machines do not have `netcat` and `lsof` preinstalled. Install them using: +```shell +sudo apt install netcat lsof +``` # Available Gradle Tasks ## Perform overall pre-commit checks +> **Google Cloud Shell note:** run `unset GOOGLE_CLOUD_PROJECT` before running tests so they would use locally running datastore emulator. ``` cd beam diff --git a/playground/backend/README.md b/playground/backend/README.md index 1b3ad66f2b42..6659e948d754 100644 --- a/playground/backend/README.md +++ b/playground/backend/README.md @@ -27,7 +27,7 @@ no setup. ## Getting Started -See [playground/README.md](../README.md) for details on requirements and setup. +See [playground/README.md](../README.md) for details on installing development dependencies. This section describes what is needed to run the backend application. @@ -35,35 +35,76 @@ This section describes what is needed to run the backend application. - Set up environment variables to run the backend locally - Running the backend via Docker -### Go commands to run/test application locally +## Go commands to run/test application locally +### Prerequisite + +> **Google Cloud Shell note:** `start_datastore_emulator.sh` script makes use of `nc` and `lsof` commands which are not installed on Google Cloud Shell machines. You can install them using `sudo apt install netcat lsof`. + +> **Google Cloud Shell note:** run `unset GOOGLE_CLOUD_PROJECT` before running tests so they would use locally running datastore emulator. + +Start datastore emulator +```shell +bash start_datastore_emulator.sh +``` + +After you have finished running tests +```shell +bash stop_datastore_emulator.sh +``` + +### Run/build Go to the backend directory: ```shell -$ cd backend +cd backend ``` -The following command is used to build and serve the backend locally: +To run backend server on development machine without using docker you'll need first to prepare a working directory anywhere outside of Beam source tree: +```shell +mkdir ~/path/to/workdir +``` +and then copy `datasets/` and `configs/` and `logging.properties` from [`playground/backend/`](/playground/backend/) directory: +```shell +cp -r {logging.properties,datasets/,configs/} ~/path/to/workdir +``` +In case if you want to start backend for Go SDK you additionally will also need to create a prepared mod dir and export an additional environment variable: ```shell -$ go run ./cmd/server/server.go +export PREPARED_MOD_DIR=~/path/to/workdir/prepared_folder +SDK_TAG=2.44.0 bash ./containers/go/setup_sdk.sh $PREPARED_MOD_DIR ``` +The following command will build and serve the backend locally: + +```shell +SERVER_PORT= \ +BEAM_SDK= \ +APP_WORK_DIR= \ +DATASTORE_EMULATOR_HOST=127.0.0.1:8888 \ +DATASTORE_PROJECT_ID=test \ +SDK_CONFIG=../sdks-emulator.yaml \ +go run ./cmd/server +``` + +where `` should be the value of port on which you want to have the backend server available; `` is a value of desired Beam SDK, possible values are `SDK_UNSPECIFIED`, `SDK_JAVA`, `SDK_PYTHON`, `SDK_GO`, `SDK_SCIO`; `` should be set to path to your work dir, e.g. `~/path/to/workdir`. + Run the following command to generate a release build file: ```shell -$ go build ./cmd/server/server.go +go build ./cmd/server/server.go ``` +### Test Playground tests may be run using this command: ```shell -$ go test ... -v +go test ./... -v ``` The full list of commands can be found [here](https://pkg.go.dev/cmd/go). -### Set up environment variables to run the backend locally +## Set up environment variables to run the backend locally These environment variables should be set to run the backend locally: @@ -96,7 +137,7 @@ default value and there is no need to set them up to launch locally: - `PROPERTY_PATH` - is the application properties path (default value = `.`) - `CACHE_REQUEST_TIMEOUT` - is the timeout to request data from cache (default value = `5 sec`) -### Application properties +## Application properties These properties are stored in `backend/properties.yaml` file: @@ -106,7 +147,7 @@ These properties are stored in `backend/properties.yaml` file: - `removing_unused_snippets_cron` - is the cron expression for the scheduled task to remove unused snippets. - `removing_unused_snippets_days` - is the number of days after which a snippet becomes unused. -### Running the server app via Docker +## Running the server app via Docker To run the server using Docker images there are `Docker` files in the `containers` folder for Java, Python and Go languages. Each of them processes the corresponding SDK, so the backend with Go SDK will work with Go diff --git a/playground/backend/containers/scio/Dockerfile b/playground/backend/containers/scio/Dockerfile index 43feb4ad7744..73d187ea964e 100644 --- a/playground/backend/containers/scio/Dockerfile +++ b/playground/backend/containers/scio/Dockerfile @@ -48,6 +48,7 @@ COPY --from=build /go/src/playground/backend/configs /opt/playground/backend/con COPY --from=build /go/src/playground/backend/logging.properties /opt/playground/backend/ COPY --from=build /go/src/playground/backend/new_scio_project.sh /opt/playground/backend/ COPY --from=build /go/src/playground/backend/internal/fs_tool/ExampleData.scala /opt/playground/backend/ +RUN chmod +x /opt/playground/backend/new_scio_project.sh # Install sbt RUN echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | tee /etc/apt/sources.list.d/sbt.list &&\ @@ -64,8 +65,6 @@ RUN mkdir /opt/mitmproxy &&\ mkdir /usr/local/share/ca-certificates/extra COPY allow_list_proxy.py /opt/mitmproxy/ COPY allow_list.py /opt/mitmproxy/ -ENV HTTP_PROXY="http://127.0.0.1:8081" -ENV HTTPS_PROXY="http://127.0.0.1:8081" COPY src/properties.yaml /opt/playground/backend/properties.yaml COPY entrypoint.sh / @@ -90,4 +89,18 @@ RUN chown -R appuser:appgroup /opt/playground/backend/executable_files/ \ # Switch to appuser USER appuser +# Let sbt download files from Maven +RUN mkdir -p /tmp/sbt-initialize +WORKDIR /tmp/sbt-initialize +RUN /opt/playground/backend/new_scio_project.sh +WORKDIR /tmp/sbt-initialize/scio +RUN sbt "+compile" +WORKDIR / +RUN rm -r /tmp/sbt-initialize + +# Enable mitmproxy +ENV HTTP_PROXY="http://127.0.0.1:8081" +ENV HTTPS_PROXY="http://127.0.0.1:8081" +ENV SBT_OPTS="-Xmx512M -XX:+UseG1GC -XX:+UseStringDeduplication" + ENTRYPOINT ["/entrypoint.sh"] diff --git a/playground/backend/internal/code_processing/code_processing.go b/playground/backend/internal/code_processing/code_processing.go index b2e432c4f992..23e34240eaad 100644 --- a/playground/backend/internal/code_processing/code_processing.go +++ b/playground/backend/internal/code_processing/code_processing.go @@ -424,21 +424,21 @@ func readGraphFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheSer case <-ticker.C: if _, err := os.Stat(graphFilePath); err == nil { ticker.Stop() - graph, err := utils.ReadFile(pipelineId, graphFilePath) + graph, err := os.ReadFile(graphFilePath) if err != nil { logger.Errorf("%s: Error during saving graph to the file: %s", pipelineId, err.Error()) } - _ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, graph) + _ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, string(graph)) } // in case of timeout or cancel case <-pipelineLifeCycleCtx.Done(): ticker.Stop() if _, err := os.Stat(graphFilePath); err == nil { - graph, err := utils.ReadFile(pipelineId, graphFilePath) + graph, err := os.ReadFile(graphFilePath) if err != nil { logger.Errorf("%s: Error during saving graph to the file: %s", pipelineId, err.Error()) } - _ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, graph) + _ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, string(graph)) } return } diff --git a/playground/backend/internal/fs_tool/fs.go b/playground/backend/internal/fs_tool/fs.go index d344a8afddb5..f132e71c21da 100644 --- a/playground/backend/internal/fs_tool/fs.go +++ b/playground/backend/internal/fs_tool/fs.go @@ -18,12 +18,9 @@ package fs_tool import ( "beam.apache.org/playground/backend/internal/logger" "fmt" - "io" + "github.com/google/uuid" "io/fs" "os" - "path/filepath" - - "github.com/google/uuid" pb "beam.apache.org/playground/backend/internal/api/v1" "beam.apache.org/playground/backend/internal/db/entity" @@ -114,37 +111,6 @@ func (lc *LifeCycle) CreateSourceCodeFiles(sources []entity.FileEntity) error { return nil } -// CopyFile copies a file with fileName from sourceDir to destinationDir. -func (lc *LifeCycle) CopyFile(fileName, sourceDir, destinationDir string) error { - absSourcePath := filepath.Join(sourceDir, fileName) - absDestinationPath := filepath.Join(destinationDir, fileName) - sourceFileStat, err := os.Stat(absSourcePath) - if err != nil { - return err - } - - if !sourceFileStat.Mode().IsRegular() { - return fmt.Errorf("%s is not a regular file", fileName) - } - - sourceFile, err := os.Open(absSourcePath) - if err != nil { - return err - } - defer sourceFile.Close() - - destinationFile, err := os.Create(absDestinationPath) - if err != nil { - return err - } - defer destinationFile.Close() - _, err = io.Copy(destinationFile, sourceFile) - if err != nil { - return err - } - return nil -} - func (lc *LifeCycle) GetPreparerParameters() map[string]string { if lc.emulatorMockCluster == nil { return map[string]string{} diff --git a/playground/backend/internal/fs_tool/fs_test.go b/playground/backend/internal/fs_tool/fs_test.go index 7e4cc36221f7..4241fd9a486c 100644 --- a/playground/backend/internal/fs_tool/fs_test.go +++ b/playground/backend/internal/fs_tool/fs_test.go @@ -135,11 +135,7 @@ func TestLifeCycle_CopyFile(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - l := &LifeCycle{ - folderGlobs: tt.fields.folderGlobs, - Paths: tt.fields.Paths, - } - if err := l.CopyFile(tt.args.fileName, tt.args.sourceDir, tt.args.destinationDir); (err != nil) != tt.wantErr { + if err := utils.CopyFilePreservingName(tt.args.fileName, tt.args.sourceDir, tt.args.destinationDir); (err != nil) != tt.wantErr { t.Errorf("CopyFile() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go index 06aa2e2ddf8d..878d73efb1c5 100644 --- a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go +++ b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go @@ -32,7 +32,7 @@ import ( "beam.apache.org/playground/backend/internal/db/entity" "beam.apache.org/playground/backend/internal/fs_tool" "beam.apache.org/playground/backend/internal/logger" - "beam.apache.org/playground/backend/internal/utils" + utils "beam.apache.org/playground/backend/internal/utils" ) const ( @@ -41,13 +41,11 @@ const ( javaLogFilePlaceholder = "{logFilePath}" goModFileName = "go.mod" goSumFileName = "go.sum" - scioProjectName = "y" + bashCmd = "bash" + scioProjectName = "scio" scioProjectPath = scioProjectName + "/src/main/scala/" + scioProjectName logFileName = "logs.log" defaultExampleInSbt = "WordCount.scala" - shCmd = "sh" - rmCmd = "rm" - cpCmd = "cp" scioProject = "new_scio_project.sh" scioCommonConstants = "ExampleData.scala" ) @@ -128,11 +126,11 @@ func Setup(sdk pb.Sdk, sources []entity.FileEntity, pipelineId uuid.UUID, workin // prepareGoFiles prepares file for Go environment. // Copy go.mod and go.sum file from /path/to/preparedModDir to /path/to/workingDir/pipelinesFolder/{pipelineId} func prepareGoFiles(lc *fs_tool.LifeCycle, preparedModDir string, pipelineId uuid.UUID) error { - if err := lc.CopyFile(goModFileName, preparedModDir, lc.Paths.AbsoluteBaseFolderPath); err != nil { + if err := utils.CopyFilePreservingName(goModFileName, preparedModDir, lc.Paths.AbsoluteBaseFolderPath); err != nil { logger.Errorf("%s: error during copying %s file: %s\n", pipelineId, goModFileName, err.Error()) return err } - if err := lc.CopyFile(goSumFileName, preparedModDir, lc.Paths.AbsoluteBaseFolderPath); err != nil { + if err := utils.CopyFilePreservingName(goSumFileName, preparedModDir, lc.Paths.AbsoluteBaseFolderPath); err != nil { logger.Errorf("%s: error during copying %s file: %s\n", pipelineId, goSumFileName, err.Error()) return err } @@ -144,7 +142,7 @@ func prepareGoFiles(lc *fs_tool.LifeCycle, preparedModDir string, pipelineId uui // // and update this file according to pipeline. func prepareJavaFiles(lc *fs_tool.LifeCycle, workingDir string, pipelineId uuid.UUID) error { - err := lc.CopyFile(javaLogConfigFileName, workingDir, lc.Paths.AbsoluteBaseFolderPath) + err := utils.CopyFilePreservingName(javaLogConfigFileName, workingDir, lc.Paths.AbsoluteBaseFolderPath) if err != nil { logger.Errorf("%s: error during copying logging.properties file: %s\n", pipelineId, err.Error()) return err @@ -194,7 +192,7 @@ func updateJavaLogConfigFile(paths fs_tool.LifeCyclePaths) error { } func prepareSbtFiles(lc *fs_tool.LifeCycle, pipelineFolder string, workingDir string) (*fs_tool.LifeCycle, error) { - cmd := exec.Command(shCmd, filepath.Join(workingDir, scioProject)) + cmd := exec.Command(bashCmd, filepath.Join(workingDir, scioProject)) cmd.Dir = pipelineFolder _, err := cmd.Output() if err != nil { @@ -210,30 +208,29 @@ func prepareSbtFiles(lc *fs_tool.LifeCycle, pipelineFolder string, workingDir st projectFolder, _ := filepath.Abs(filepath.Join(pipelineFolder, scioProjectName)) executableName := lc.Paths.ExecutableName - _, err = exec.Command(rmCmd, filepath.Join(absFileFolderPath, defaultExampleInSbt)).Output() + err = os.Remove(filepath.Join(absFileFolderPath, defaultExampleInSbt)) if err != nil { return lc, err } - _, err = exec.Command(cpCmd, filepath.Join(workingDir, scioCommonConstants), absFileFolderPath).Output() + err = utils.CopyFilePreservingName(scioCommonConstants, workingDir, absFileFolderPath) if err != nil { return lc, err } - lc = &fs_tool.LifeCycle{ - Paths: fs_tool.LifeCyclePaths{ - SourceFileName: fileName, - AbsoluteSourceFileFolderPath: absFileFolderPath, - AbsoluteSourceFilePath: absFilePath, - ExecutableFileName: fileName, - AbsoluteExecutableFileFolderPath: absFileFolderPath, - AbsoluteExecutableFilePath: absFilePath, - AbsoluteBaseFolderPath: absFileFolderPath, - AbsoluteLogFilePath: absLogFilePath, - AbsoluteGraphFilePath: absGraphFilePath, - ProjectDir: projectFolder, - }, - } - lc.Paths.ExecutableName = executableName + lc.Paths = fs_tool.LifeCyclePaths{ + SourceFileName: fileName, + AbsoluteSourceFileFolderPath: absFileFolderPath, + AbsoluteSourceFilePath: absFilePath, + ExecutableFileName: fileName, + AbsoluteExecutableFileFolderPath: absFileFolderPath, + AbsoluteExecutableFilePath: absFilePath, + AbsoluteBaseFolderPath: absFileFolderPath, + AbsoluteLogFilePath: absLogFilePath, + AbsoluteGraphFilePath: absGraphFilePath, + ProjectDir: projectFolder, + ExecutableName: executableName, + } + return lc, nil } diff --git a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go index 74776f527294..e709e87ea7cd 100644 --- a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go +++ b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go @@ -285,7 +285,13 @@ func TestSetup(t *testing.T) { pipelinesFolder: pipelinesFolder, }, prep: func() error { - _, err := os.Create(filepath.Join(workingDir, scioCommonConstants)) + sourceScioShFile := "../../../new_scio_project.sh" + scioShFile := filepath.Join(workingDir, scioProject) + err := utils.CopyFile(sourceScioShFile, scioShFile) + if err != nil { + return err + } + _, err = os.Create(filepath.Join(workingDir, scioCommonConstants)) if err != nil { return err } diff --git a/playground/backend/internal/utils/common.go b/playground/backend/internal/utils/common.go index 7af60cba6f64..9720ed5df9e3 100644 --- a/playground/backend/internal/utils/common.go +++ b/playground/backend/internal/utils/common.go @@ -17,9 +17,12 @@ package utils import ( "beam.apache.org/playground/backend/internal/logger" - "github.com/google/uuid" + "fmt" "gopkg.in/yaml.v3" + "io" "io/ioutil" + "os" + "path/filepath" "regexp" ) @@ -28,16 +31,6 @@ func ReduceWhiteSpacesToSinge(s string) string { return re.ReplaceAllString(s, " ") } -// ReadFile reads from file and returns string. -func ReadFile(pipelineId uuid.UUID, path string) (string, error) { - content, err := ioutil.ReadFile(path) - if err != nil { - logger.Errorf("%s: ReadFile(): error during reading from a file: %s", pipelineId, err.Error()) - return "", err - } - return string(content), nil -} - // ReadYamlFile reads from a yaml file. func ReadYamlFile(filename string, out interface{}) error { buf, err := ioutil.ReadFile(filename) @@ -51,3 +44,39 @@ func ReadYamlFile(filename string, out interface{}) error { } return nil } + +// CopyFilePreservingName copies a file with fileName from sourceDir to destinationDir. +func CopyFilePreservingName(fileName, sourceDir, destinationDir string) error { + absSourcePath := filepath.Join(sourceDir, fileName) + absDestinationPath := filepath.Join(destinationDir, fileName) + return CopyFile(absSourcePath, absDestinationPath) +} + +// CopyFile copies a file from sourcePath to destinationPath +func CopyFile(sourcePath, destinationPath string) error { + sourceFileStat, err := os.Stat(sourcePath) + if err != nil { + return err + } + + if !sourceFileStat.Mode().IsRegular() { + return fmt.Errorf("%s is not a regular file", sourcePath) + } + + sourceFile, err := os.Open(sourcePath) + if err != nil { + return err + } + defer sourceFile.Close() + + destinationFile, err := os.Create(destinationPath) + if err != nil { + return err + } + defer destinationFile.Close() + _, err = io.Copy(destinationFile, sourceFile) + if err != nil { + return err + } + return nil +} diff --git a/playground/backend/internal/utils/common_test.go b/playground/backend/internal/utils/common_test.go index b62c8d3fb5f2..7303582f1010 100644 --- a/playground/backend/internal/utils/common_test.go +++ b/playground/backend/internal/utils/common_test.go @@ -16,52 +16,9 @@ package utils import ( - "fmt" - "github.com/google/uuid" - "os" - "path/filepath" "testing" ) -const ( - sourceDir = "sourceDir" - fileName = "file.txt" - fileContent = "content" - javaFileName = "javaFileName.java" - pythonExampleName = "wordCount.py" - wordCountPython = "import argparse\nimport logging\nimport re\n\nimport apache_beam as beam\nfrom apache_beam.io import ReadFromText\nfrom apache_beam.io import WriteToText\nfrom apache_beam.options.pipeline_options import PipelineOptions\nfrom apache_beam.options.pipeline_options import SetupOptions\n\n\nclass WordExtractingDoFn(beam.DoFn):\n \"\"\"Parse each line of input text into words.\"\"\"\n def process(self, element):\n \"\"\"Returns an iterator over the words of this element.\n\n The element is a line of text. If the line is blank, note that, too.\n\n Args:\n element: the element being processed\n\n Returns:\n The processed element.\n \"\"\"\n return re.findall(r'[\\w\\']+', element, re.UNICODE)\n\n\ndef run(argv=None, save_main_session=True):\n \"\"\"Main entry point; defines and runs the wordcount pipeline.\"\"\"\n parser = argparse.ArgumentParser()\n parser.add_argument(\n '--input',\n dest='input',\n default='gs://dataflow-samples/shakespeare/kinglear.txt',\n help='Input file to process.')\n parser.add_argument(\n '--output',\n dest='output',\n required=True,\n help='Output file to write results to.')\n known_args, pipeline_args = parser.parse_known_args(argv)\n\n # We use the save_main_session option because one or more DoFn's in this\n # workflow rely on global context (e.g., a module imported at module level).\n pipeline_options = PipelineOptions(pipeline_args)\n pipeline_options.view_as(SetupOptions).save_main_session = save_main_session\n\n # The pipeline will be run on exiting the with block.\n with beam.Pipeline(options=pipeline_options) as p:\n\n # Read the text file[pattern] into a PCollection.\n lines = p | 'Read' >> ReadFromText(known_args.input)\n\n counts = (\n lines\n | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))\n | 'PairWithOne' >> beam.Map(lambda x: (x, 1))\n | 'GroupAndSum' >> beam.CombinePerKey(sum))\n\n # Format the counts into a PCollection of strings.\n def format_result(word, count):\n return '%s: %d' % (word, count)\n\n output = counts | 'Format' >> beam.MapTuple(format_result)\n\n # Write the output using a \"Write\" transform that has side effects.\n # pylint: disable=expression-not-assigned\n output | 'Write' >> WriteToText(known_args.output)\n\n\nif __name__ == '__main__':\n logging.getLogger().setLevel(logging.INFO)\n run()" - javaCode = "package org.apache.beam.examples;\n\n// beam-playground:\n// name: MinimalWordCount\n// description: An example that counts words in Shakespeare's works.\n// multifile: false\n// default_example: true\n// context_line: 71\n// categories:\n// - Combiners\n// - Filtering\n// - IO\n// - Core Transforms\n// - Quickstart\n\nimport java.util.Arrays;\nimport org.apache.beam.sdk.Pipeline;\nimport org.apache.beam.sdk.io.TextIO;\nimport org.apache.beam.sdk.options.PipelineOptions;\nimport org.apache.beam.sdk.options.PipelineOptionsFactory;\nimport org.apache.beam.sdk.transforms.Count;\nimport org.apache.beam.sdk.transforms.Filter;\nimport org.apache.beam.sdk.transforms.FlatMapElements;\nimport org.apache.beam.sdk.transforms.MapElements;\nimport org.apache.beam.sdk.values.KV;\nimport org.apache.beam.sdk.values.TypeDescriptors;\n\n/**\n * An example that counts words in Shakespeare.\n *\n *

This class, {@link MinimalWordCount}, is the first in a series of four successively more\n * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or\n * argument processing, and focus on construction of the pipeline, which chains together the\n * application of core transforms.\n *\n *

Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the\n * {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional\n * concepts.\n *\n *

Concepts:\n *\n *

\n *   1. Reading data from text files\n *   2. Specifying 'inline' transforms\n *   3. Counting items in a PCollection\n *   4. Writing data to text files\n * 
\n *\n *

No arguments are required to run this pipeline. It will be executed with the DirectRunner. You\n * can see the results in the output files in your current working directory, with names like\n * \"wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate\n * file service.\n */\npublic class MinimalWordCount {\n\n public static void main(String[] args) {\n\n // Create a PipelineOptions object. This object lets us set various execution\n // options for our pipeline, such as the runner you wish to use. This example\n // will run with the DirectRunner by default, based on the class path configured\n // in its dependencies.\n PipelineOptions options = PipelineOptionsFactory.create();\n\n // In order to run your pipeline, you need to make following runner specific changes:\n //\n // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner\n // or FlinkRunner.\n // CHANGE 2/3: Specify runner-required options.\n // For BlockingDataflowRunner, set project and temp location as follows:\n // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);\n // dataflowOptions.setRunner(BlockingDataflowRunner.class);\n // dataflowOptions.setProject(\"SET_YOUR_PROJECT_ID_HERE\");\n // dataflowOptions.setTempLocation(\"gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY\");\n // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}\n // for more details.\n // options.as(FlinkPipelineOptions.class)\n // .setRunner(FlinkRunner.class);\n\n // Create the Pipeline object with the options we defined above\n Pipeline p = Pipeline.create(options);\n\n // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set\n // of input text files. TextIO.Read returns a PCollection where each element is one line from\n // the input text (a set of Shakespeare's texts).\n\n // This example reads from a public dataset containing the text of King Lear.\n p.apply(TextIO.read().from(\"gs://apache-beam-samples/shakespeare/kinglear.txt\"))\n\n // Concept #2: Apply a FlatMapElements transform the PCollection of text lines.\n // This transform splits the lines in PCollection, where each element is an\n // individual word in Shakespeare's collected texts.\n .apply(\n FlatMapElements.into(TypeDescriptors.strings())\n .via((String line) -> Arrays.asList(line.split(\"[^\\\\p{L}]+\"))))\n // We use a Filter transform to avoid empty word\n .apply(Filter.by((String word) -> !word.isEmpty()))\n // Concept #3: Apply the Count transform to our PCollection of individual words. The Count\n // transform returns a new PCollection of key/value pairs, where each key represents a\n // unique word in the text. The associated value is the occurrence count for that word.\n .apply(Count.perElement())\n // Apply a MapElements transform that formats our PCollection of word counts into a\n // printable string, suitable for writing to an output file.\n .apply(\n MapElements.into(TypeDescriptors.strings())\n .via(\n (KV wordCount) ->\n wordCount.getKey() + \": \" + wordCount.getValue()))\n // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.\n // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of\n // formatted strings) to a series of text files.\n //\n // By default, it will write to a set of files with names like wordcounts-00001-of-00005\n .apply(TextIO.write().to(\"wordcounts\"));\n\n p.run().waitUntilFinish();\n }\n}" - filePermission = 0600 - fullPermission = 0755 -) - -func TestMain(m *testing.M) { - err := setup() - if err != nil { - panic(fmt.Errorf("error during test setup: %s", err.Error())) - } - defer teardown() - m.Run() -} - -func setup() error { - err := os.Mkdir(sourceDir, fullPermission) - if err != nil { - return err - } - filePath := filepath.Join(sourceDir, fileName) - err = os.WriteFile(filePath, []byte(fileContent), filePermission) - javaFilePath := filepath.Join(sourceDir, javaFileName) - err = os.WriteFile(javaFilePath, []byte(javaCode), filePermission) - wordCountPythonPath := filepath.Join(sourceDir, pythonExampleName) - err = os.WriteFile(wordCountPythonPath, []byte(wordCountPython), filePermission) - return err -} - -func teardown() error { - return os.RemoveAll(sourceDir) -} - func TestReduceWhiteSpacesToSinge(t *testing.T) { type args struct { s string @@ -82,47 +39,3 @@ func TestReduceWhiteSpacesToSinge(t *testing.T) { }) } } - -func TestReadFile(t *testing.T) { - type args struct { - pipelineId uuid.UUID - path string - } - tests := []struct { - name string - args args - want string - wantErr bool - }{ - { - name: "Read from existing file", - args: args{ - pipelineId: uuid.New(), - path: filepath.Join(sourceDir, fileName), - }, - want: fileContent, - wantErr: false, - }, - { - name: "Read from non-existent file", - args: args{ - pipelineId: uuid.New(), - path: filepath.Join(sourceDir, "non-existent_file.txt"), - }, - want: "", - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := ReadFile(tt.args.pipelineId, tt.args.path) - if (err != nil) != tt.wantErr { - t.Errorf("ReadFile() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("ReadFile() got = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/playground/backend/internal/utils/preparers_utils.go b/playground/backend/internal/utils/preparers_utils.go index 7f9867d6f631..5a446ceaaeec 100644 --- a/playground/backend/internal/utils/preparers_utils.go +++ b/playground/backend/internal/utils/preparers_utils.go @@ -148,7 +148,12 @@ func GetPublicClassName(filePath, pattern string) (string, error) { return "", err } re := regexp.MustCompile(pattern) - className := re.FindStringSubmatch(string(code))[1] + classNameMatch := re.FindStringSubmatch(string(code)) + if len(classNameMatch) == 0 { + return "", errors.New(fmt.Sprintf("unable to find main class name in file %s", filePath)) + } + + className := classNameMatch[1] return className, err } diff --git a/playground/backend/internal/utils/preparers_utils_test.go b/playground/backend/internal/utils/preparers_utils_test.go index 498b2d964e5f..72f0eab5cb41 100644 --- a/playground/backend/internal/utils/preparers_utils_test.go +++ b/playground/backend/internal/utils/preparers_utils_test.go @@ -27,6 +27,58 @@ import ( "testing" ) +const ( + sourceDir = "sourceDir" + fileName = "file.txt" + fileContent = "content" + testDataDir = "test_data" + javaFileName = "JavaFileName.java" + emptyFileName = "emptyFile.java" + pythonExampleName = "wordcount.py" + filePermission = 0600 + fullPermission = 0755 +) + +func TestMain(m *testing.M) { + err := setup() + if err != nil { + panic(fmt.Errorf("error during test setup: %s", err.Error())) + } + defer teardown() + m.Run() +} + +func setup() error { + err := os.Mkdir(sourceDir, fullPermission) + if err != nil { + return err + } + filePath := filepath.Join(sourceDir, fileName) + err = os.WriteFile(filePath, []byte(fileContent), filePermission) + if err != nil { + return err + } + sourceJavaFilePath := filepath.Join(testDataDir, javaFileName) + javaFilePath := filepath.Join(sourceDir, javaFileName) + err = CopyFile(sourceJavaFilePath, javaFilePath) + if err != nil { + return err + } + emptyFilePath := filepath.Join(sourceDir, emptyFileName) + err = os.WriteFile(emptyFilePath, []byte(""), filePermission) + if err != nil { + return err + } + sourceWordCountPythonPath := filepath.Join(testDataDir, pythonExampleName) + wordCountPythonPath := filepath.Join(sourceDir, pythonExampleName) + err = CopyFile(sourceWordCountPythonPath, wordCountPythonPath) + return err +} + +func teardown() error { + return os.RemoveAll(sourceDir) +} + func TestSpacesToEqualsOption(t *testing.T) { type args struct { pipelineOptions string @@ -177,6 +229,15 @@ func TestGetPublicClassName(t *testing.T) { want: "MinimalWordCount", wantErr: false, }, + { + name: "Get public class name from empty file", + args: args{ + filePath: filepath.Join(sourceDir, emptyFileName), + pattern: javaPublicClassNamePattern, + }, + want: "", + wantErr: true, + }, { name: "Get public class name from non-existent file", args: args{ diff --git a/playground/backend/internal/utils/test_data/JavaFileName.java b/playground/backend/internal/utils/test_data/JavaFileName.java new file mode 100644 index 000000000000..9eb8e0e54cf6 --- /dev/null +++ b/playground/backend/internal/utils/test_data/JavaFileName.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +public class MinimalWordCount { + public static void main(String[] args) { + } +} diff --git a/playground/backend/internal/utils/test_data/wordcount.py b/playground/backend/internal/utils/test_data/wordcount.py new file mode 100644 index 000000000000..f4e32c902b0b --- /dev/null +++ b/playground/backend/internal/utils/test_data/wordcount.py @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A word-counting workflow.""" + +# pytype: skip-file + +# beam-playground: +# name: WordCount +# description: An example that counts words in Shakespeare's works. +# multifile: false +# pipeline_options: --output output.txt +# context_line: 44 +# categories: +# - Combiners +# - Options +# - Quickstart +# complexity: MEDIUM +# tags: +# - options +# - count +# - combine +# - strings + +import argparse +import logging +import re + +import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions + + +class WordExtractingDoFn(beam.DoFn): + """Parse each line of input text into words.""" + def process(self, element): + """Returns an iterator over the words of this element. + + The element is a line of text. If the line is blank, note that, too. + + Args: + element: the element being processed + + Returns: + The processed element. + """ + return re.findall(r'[\w\']+', element, re.UNICODE) + + +def run(argv=None, save_main_session=True): + """Main entry point; defines and runs the wordcount pipeline.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + dest='input', + default='gs://dataflow-samples/shakespeare/kinglear.txt', + help='Input file to process.') + parser.add_argument( + '--output', + dest='output', + required=True, + help='Output file to write results to.') + known_args, pipeline_args = parser.parse_known_args(argv) + + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + # The pipeline will be run on exiting the with block. + with beam.Pipeline(options=pipeline_options) as p: + + # Read the text file[pattern] into a PCollection. + lines = p | 'Read' >> ReadFromText(known_args.input) + + counts = ( + lines + | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) + | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) + | 'GroupAndSum' >> beam.CombinePerKey(sum)) + + # Format the counts into a PCollection of strings. + def format_result(word, count): + return '%s: %d' % (word, count) + + output = counts | 'Format' >> beam.MapTuple(format_result) + + # Write the output using a "Write" transform that has side effects. + # pylint: disable=expression-not-assigned + output | 'Write' >> WriteToText(known_args.output) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/playground/backend/new_scio_project.sh b/playground/backend/new_scio_project.sh old mode 100644 new mode 100755 index a01fc78c923c..80e031f0fd8c --- a/playground/backend/new_scio_project.sh +++ b/playground/backend/new_scio_project.sh @@ -15,4 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -yes | sbt new spotify/scio-template.g8 +{ printf scio\\nscio\\n; yes; } | sbt new spotify/scio-template.g8 + +echo "Compile / run / fork := false" >> scio/build.sbt diff --git a/playground/docker-compose.local.yaml b/playground/docker-compose.local.yaml index e202b6a4dca7..92121a2f0c5d 100644 --- a/playground/docker-compose.local.yaml +++ b/playground/docker-compose.local.yaml @@ -91,6 +91,10 @@ services: - "8090:8090" depends_on: - redis + # Impose limits on container to better mimic real deployment environment + mem_limit: 2048M + ulimits: + rss: 1073741824 frontend: image: apache/beam_playground-frontend