From 5c6561021b0d0395d1fc95e67bc75375292f8caa Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Mon, 9 Jan 2023 18:25:15 +0400 Subject: [PATCH 01/17] Initialize SBT cache during SCIO playground container build Run sbt tool during container build to let it download Scala dependencies form Maven during build time instead of having to wait for downloading all dependencies during first run of examples in container --- playground/backend/containers/scio/Dockerfile | 13 +++++++++++-- playground/backend/new_scio_project.sh | 0 2 files changed, 11 insertions(+), 2 deletions(-) mode change 100644 => 100755 playground/backend/new_scio_project.sh diff --git a/playground/backend/containers/scio/Dockerfile b/playground/backend/containers/scio/Dockerfile index 6df3c0ad4b8c..0aa1ff54b8dc 100644 --- a/playground/backend/containers/scio/Dockerfile +++ b/playground/backend/containers/scio/Dockerfile @@ -48,6 +48,7 @@ COPY --from=build /go/src/playground/backend/configs /opt/playground/backend/con COPY --from=build /go/src/playground/backend/logging.properties /opt/playground/backend/ COPY --from=build /go/src/playground/backend/new_scio_project.sh /opt/playground/backend/ COPY --from=build /go/src/playground/backend/internal/fs_tool/ExampleData.scala /opt/playground/backend/ +RUN chmod +x /opt/playground/backend/new_scio_project.sh # Install sbt RUN echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | tee /etc/apt/sources.list.d/sbt.list &&\ @@ -64,8 +65,6 @@ RUN mkdir /opt/mitmproxy &&\ mkdir /usr/local/share/ca-certificates/extra COPY allow_list_proxy.py /opt/mitmproxy/ COPY allow_list.py /opt/mitmproxy/ -ENV HTTP_PROXY="http://127.0.0.1:8081" -ENV HTTPS_PROXY="http://127.0.0.1:8081" COPY src/properties.yaml /opt/playground/backend/properties.yaml COPY entrypoint.sh / @@ -84,4 +83,14 @@ RUN chown -R appuser:appgroup /opt/playground/backend/executable_files/ \ # Switch to appuser USER appuser +# Let sbt download files from Maven +RUN mkdir -p /tmp/sbt-initialize +WORKDIR /tmp/sbt-initialize +RUN /opt/playground/backend/new_scio_project.sh +WORKDIR / + +# Enable mitmproxy +ENV HTTP_PROXY="http://127.0.0.1:8081" +ENV HTTPS_PROXY="http://127.0.0.1:8081" + ENTRYPOINT ["/entrypoint.sh"] diff --git a/playground/backend/new_scio_project.sh b/playground/backend/new_scio_project.sh old mode 100644 new mode 100755 From 617c182f8003a361bca5d34ec26c7945ca0a46f4 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Mon, 9 Jan 2023 18:35:25 +0400 Subject: [PATCH 02/17] Fix issue with reading of GRPC_TIMEOUT environment variables in CI/CD scripts --- playground/infrastructure/grpc_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/playground/infrastructure/grpc_client.py b/playground/infrastructure/grpc_client.py index bf34cf5a0f25..da8bb1617b51 100644 --- a/playground/infrastructure/grpc_client.py +++ b/playground/infrastructure/grpc_client.py @@ -34,7 +34,7 @@ class GRPCClient: def __init__(self, wait_for_ready=True): use_webgrpc = os.getenv("BEAM_USE_WEBGRPC", False) - timeout = os.getenv("GRPC_TIMEOUT", 10) + timeout = int(os.getenv("GRPC_TIMEOUT", 10)) if use_webgrpc: self._channel = sonora.aio.insecure_web_channel(Config.SERVER_ADDRESS) else: From b42ad5be11eba6baf669dbaf2e31d798ec9d4ff4 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Mon, 9 Jan 2023 20:05:07 +0400 Subject: [PATCH 03/17] Fix cleanup of execution environment for Scala examples --- .../life_cycle/life_cycle_setuper.go | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go index b7926835f61e..2c6e3ecd3499 100644 --- a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go +++ b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go @@ -45,9 +45,6 @@ const ( scioProjectPath = scioProjectName + "/src/main/scala/" + scioProjectName logFileName = "logs.log" defaultExampleInSbt = "WordCount.scala" - shCmd = "sh" - rmCmd = "rm" - cpCmd = "cp" scioProject = "new_scio_project.sh" scioCommonConstants = "ExampleData.scala" ) @@ -126,6 +123,7 @@ func prepareGoFiles(lc *fs_tool.LifeCycle, preparedModDir string, pipelineId uui // prepareJavaFiles prepares file for Java environment. // Copy log config file from /path/to/workingDir to /path/to/workingDir/pipelinesFolder/{pipelineId} +// // and update this file according to pipeline. func prepareJavaFiles(lc *fs_tool.LifeCycle, workingDir string, pipelineId uuid.UUID) error { err := lc.CopyFile(javaLogConfigFileName, workingDir, lc.Paths.AbsoluteBaseFolderPath) @@ -178,7 +176,7 @@ func updateJavaLogConfigFile(paths fs_tool.LifeCyclePaths) error { } func prepareSbtFiles(lc *fs_tool.LifeCycle, pipelineFolder string, workingDir string) (*fs_tool.LifeCycle, error) { - cmd := exec.Command(shCmd, filepath.Join(workingDir, scioProject)) + cmd := exec.Command(filepath.Join(workingDir, scioProject)) cmd.Dir = pipelineFolder _, err := cmd.Output() if err != nil { @@ -194,30 +192,29 @@ func prepareSbtFiles(lc *fs_tool.LifeCycle, pipelineFolder string, workingDir st projectFolder, _ := filepath.Abs(filepath.Join(pipelineFolder, scioProjectName)) executableName := lc.Paths.ExecutableName - _, err = exec.Command(rmCmd, filepath.Join(absFileFolderPath, defaultExampleInSbt)).Output() + err = os.Remove(filepath.Join(absFileFolderPath, defaultExampleInSbt)) if err != nil { return lc, err } - _, err = exec.Command(cpCmd, filepath.Join(workingDir, scioCommonConstants), absFileFolderPath).Output() + err = lc.CopyFile(scioCommonConstants, workingDir, absFileFolderPath) if err != nil { return lc, err } - lc = &fs_tool.LifeCycle{ - Paths: fs_tool.LifeCyclePaths{ - SourceFileName: fileName, - AbsoluteSourceFileFolderPath: absFileFolderPath, - AbsoluteSourceFilePath: absFilePath, - ExecutableFileName: fileName, - AbsoluteExecutableFileFolderPath: absFileFolderPath, - AbsoluteExecutableFilePath: absFilePath, - AbsoluteBaseFolderPath: absFileFolderPath, - AbsoluteLogFilePath: absLogFilePath, - AbsoluteGraphFilePath: absGraphFilePath, - ProjectDir: projectFolder, - }, - } - lc.Paths.ExecutableName = executableName + lc.Paths = fs_tool.LifeCyclePaths{ + SourceFileName: fileName, + AbsoluteSourceFileFolderPath: absFileFolderPath, + AbsoluteSourceFilePath: absFilePath, + ExecutableFileName: fileName, + AbsoluteExecutableFileFolderPath: absFileFolderPath, + AbsoluteExecutableFilePath: absFilePath, + AbsoluteBaseFolderPath: absFileFolderPath, + AbsoluteLogFilePath: absLogFilePath, + AbsoluteGraphFilePath: absGraphFilePath, + ProjectDir: projectFolder, + ExecutableName: executableName, + } + return lc, nil } From f59bbe7ef98963457a5d2bedcd5aff2425e19e3f Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Tue, 10 Jan 2023 18:50:03 +0400 Subject: [PATCH 04/17] Fix panic in preparers when an empty file is passed --- playground/backend/internal/utils/common_test.go | 12 ++++++++++++ playground/backend/internal/utils/preparers_utils.go | 11 ++++++++--- .../backend/internal/utils/preparers_utils_test.go | 9 +++++++++ 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/playground/backend/internal/utils/common_test.go b/playground/backend/internal/utils/common_test.go index b62c8d3fb5f2..cc51aa01b4e0 100644 --- a/playground/backend/internal/utils/common_test.go +++ b/playground/backend/internal/utils/common_test.go @@ -28,6 +28,7 @@ const ( fileName = "file.txt" fileContent = "content" javaFileName = "javaFileName.java" + emptyFileName = "emptyFile.java" pythonExampleName = "wordCount.py" wordCountPython = "import argparse\nimport logging\nimport re\n\nimport apache_beam as beam\nfrom apache_beam.io import ReadFromText\nfrom apache_beam.io import WriteToText\nfrom apache_beam.options.pipeline_options import PipelineOptions\nfrom apache_beam.options.pipeline_options import SetupOptions\n\n\nclass WordExtractingDoFn(beam.DoFn):\n \"\"\"Parse each line of input text into words.\"\"\"\n def process(self, element):\n \"\"\"Returns an iterator over the words of this element.\n\n The element is a line of text. If the line is blank, note that, too.\n\n Args:\n element: the element being processed\n\n Returns:\n The processed element.\n \"\"\"\n return re.findall(r'[\\w\\']+', element, re.UNICODE)\n\n\ndef run(argv=None, save_main_session=True):\n \"\"\"Main entry point; defines and runs the wordcount pipeline.\"\"\"\n parser = argparse.ArgumentParser()\n parser.add_argument(\n '--input',\n dest='input',\n default='gs://dataflow-samples/shakespeare/kinglear.txt',\n help='Input file to process.')\n parser.add_argument(\n '--output',\n dest='output',\n required=True,\n help='Output file to write results to.')\n known_args, pipeline_args = parser.parse_known_args(argv)\n\n # We use the save_main_session option because one or more DoFn's in this\n # workflow rely on global context (e.g., a module imported at module level).\n pipeline_options = PipelineOptions(pipeline_args)\n pipeline_options.view_as(SetupOptions).save_main_session = save_main_session\n\n # The pipeline will be run on exiting the with block.\n with beam.Pipeline(options=pipeline_options) as p:\n\n # Read the text file[pattern] into a PCollection.\n lines = p | 'Read' >> ReadFromText(known_args.input)\n\n counts = (\n lines\n | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))\n | 'PairWithOne' >> beam.Map(lambda x: (x, 1))\n | 'GroupAndSum' >> beam.CombinePerKey(sum))\n\n # Format the counts into a PCollection of strings.\n def format_result(word, count):\n return '%s: %d' % (word, count)\n\n output = counts | 'Format' >> beam.MapTuple(format_result)\n\n # Write the output using a \"Write\" transform that has side effects.\n # pylint: disable=expression-not-assigned\n output | 'Write' >> WriteToText(known_args.output)\n\n\nif __name__ == '__main__':\n logging.getLogger().setLevel(logging.INFO)\n run()" javaCode = "package org.apache.beam.examples;\n\n// beam-playground:\n// name: MinimalWordCount\n// description: An example that counts words in Shakespeare's works.\n// multifile: false\n// default_example: true\n// context_line: 71\n// categories:\n// - Combiners\n// - Filtering\n// - IO\n// - Core Transforms\n// - Quickstart\n\nimport java.util.Arrays;\nimport org.apache.beam.sdk.Pipeline;\nimport org.apache.beam.sdk.io.TextIO;\nimport org.apache.beam.sdk.options.PipelineOptions;\nimport org.apache.beam.sdk.options.PipelineOptionsFactory;\nimport org.apache.beam.sdk.transforms.Count;\nimport org.apache.beam.sdk.transforms.Filter;\nimport org.apache.beam.sdk.transforms.FlatMapElements;\nimport org.apache.beam.sdk.transforms.MapElements;\nimport org.apache.beam.sdk.values.KV;\nimport org.apache.beam.sdk.values.TypeDescriptors;\n\n/**\n * An example that counts words in Shakespeare.\n *\n *

This class, {@link MinimalWordCount}, is the first in a series of four successively more\n * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or\n * argument processing, and focus on construction of the pipeline, which chains together the\n * application of core transforms.\n *\n *

Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the\n * {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional\n * concepts.\n *\n *

Concepts:\n *\n *

\n *   1. Reading data from text files\n *   2. Specifying 'inline' transforms\n *   3. Counting items in a PCollection\n *   4. Writing data to text files\n * 
\n *\n *

No arguments are required to run this pipeline. It will be executed with the DirectRunner. You\n * can see the results in the output files in your current working directory, with names like\n * \"wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate\n * file service.\n */\npublic class MinimalWordCount {\n\n public static void main(String[] args) {\n\n // Create a PipelineOptions object. This object lets us set various execution\n // options for our pipeline, such as the runner you wish to use. This example\n // will run with the DirectRunner by default, based on the class path configured\n // in its dependencies.\n PipelineOptions options = PipelineOptionsFactory.create();\n\n // In order to run your pipeline, you need to make following runner specific changes:\n //\n // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner\n // or FlinkRunner.\n // CHANGE 2/3: Specify runner-required options.\n // For BlockingDataflowRunner, set project and temp location as follows:\n // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);\n // dataflowOptions.setRunner(BlockingDataflowRunner.class);\n // dataflowOptions.setProject(\"SET_YOUR_PROJECT_ID_HERE\");\n // dataflowOptions.setTempLocation(\"gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY\");\n // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}\n // for more details.\n // options.as(FlinkPipelineOptions.class)\n // .setRunner(FlinkRunner.class);\n\n // Create the Pipeline object with the options we defined above\n Pipeline p = Pipeline.create(options);\n\n // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set\n // of input text files. TextIO.Read returns a PCollection where each element is one line from\n // the input text (a set of Shakespeare's texts).\n\n // This example reads from a public dataset containing the text of King Lear.\n p.apply(TextIO.read().from(\"gs://apache-beam-samples/shakespeare/kinglear.txt\"))\n\n // Concept #2: Apply a FlatMapElements transform the PCollection of text lines.\n // This transform splits the lines in PCollection, where each element is an\n // individual word in Shakespeare's collected texts.\n .apply(\n FlatMapElements.into(TypeDescriptors.strings())\n .via((String line) -> Arrays.asList(line.split(\"[^\\\\p{L}]+\"))))\n // We use a Filter transform to avoid empty word\n .apply(Filter.by((String word) -> !word.isEmpty()))\n // Concept #3: Apply the Count transform to our PCollection of individual words. The Count\n // transform returns a new PCollection of key/value pairs, where each key represents a\n // unique word in the text. The associated value is the occurrence count for that word.\n .apply(Count.perElement())\n // Apply a MapElements transform that formats our PCollection of word counts into a\n // printable string, suitable for writing to an output file.\n .apply(\n MapElements.into(TypeDescriptors.strings())\n .via(\n (KV wordCount) ->\n wordCount.getKey() + \": \" + wordCount.getValue()))\n // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.\n // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of\n // formatted strings) to a series of text files.\n //\n // By default, it will write to a set of files with names like wordcounts-00001-of-00005\n .apply(TextIO.write().to(\"wordcounts\"));\n\n p.run().waitUntilFinish();\n }\n}" @@ -51,8 +52,19 @@ func setup() error { } filePath := filepath.Join(sourceDir, fileName) err = os.WriteFile(filePath, []byte(fileContent), filePermission) + if err != nil { + return err + } javaFilePath := filepath.Join(sourceDir, javaFileName) err = os.WriteFile(javaFilePath, []byte(javaCode), filePermission) + if err != nil { + return err + } + emptyFilePath := filepath.Join(sourceDir, emptyFileName) + err = os.WriteFile(emptyFilePath, []byte(""), filePermission) + if err != nil { + return err + } wordCountPythonPath := filepath.Join(sourceDir, pythonExampleName) err = os.WriteFile(wordCountPythonPath, []byte(wordCountPython), filePermission) return err diff --git a/playground/backend/internal/utils/preparers_utils.go b/playground/backend/internal/utils/preparers_utils.go index 50328551f188..5a446ceaaeec 100644 --- a/playground/backend/internal/utils/preparers_utils.go +++ b/playground/backend/internal/utils/preparers_utils.go @@ -82,7 +82,7 @@ func ProcessLine(curLine string, pipelineName *string, spaces *string, regs *[]* return done, definitionType, err } -//getVarName looking for a declaration of a beam pipeline and it's name +// getVarName looking for a declaration of a beam pipeline and it's name func getVarName(line, spaces, pipelineName *string, regs *[]*regexp.Regexp) PipelineDefinitionType { for i, reg := range *regs { found := (*reg).FindAllStringSubmatch(*line, -1) @@ -96,7 +96,7 @@ func getVarName(line, spaces, pipelineName *string, regs *[]*regexp.Regexp) Pipe return 0 } -//addGraphCode adds line for the graph saving to specific place in the code +// addGraphCode adds line for the graph saving to specific place in the code func addGraphCode(line, spaces, pipelineName *string, regs *[]*regexp.Regexp) PipelineDefinitionType { for i, reg := range *regs { found := (*reg).FindAllStringSubmatch(*line, -1) @@ -148,7 +148,12 @@ func GetPublicClassName(filePath, pattern string) (string, error) { return "", err } re := regexp.MustCompile(pattern) - className := re.FindStringSubmatch(string(code))[1] + classNameMatch := re.FindStringSubmatch(string(code)) + if len(classNameMatch) == 0 { + return "", errors.New(fmt.Sprintf("unable to find main class name in file %s", filePath)) + } + + className := classNameMatch[1] return className, err } diff --git a/playground/backend/internal/utils/preparers_utils_test.go b/playground/backend/internal/utils/preparers_utils_test.go index 498b2d964e5f..ae40021d7049 100644 --- a/playground/backend/internal/utils/preparers_utils_test.go +++ b/playground/backend/internal/utils/preparers_utils_test.go @@ -177,6 +177,15 @@ func TestGetPublicClassName(t *testing.T) { want: "MinimalWordCount", wantErr: false, }, + { + name: "Get public class name from empty file", + args: args{ + filePath: filepath.Join(sourceDir, emptyFileName), + pattern: javaPublicClassNamePattern, + }, + want: "", + wantErr: true, + }, { name: "Get public class name from non-existent file", args: args{ From 2d782c1d98470f7cec3c75015e86e54c6eb84243 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Tue, 10 Jan 2023 19:27:02 +0400 Subject: [PATCH 05/17] Use better name for SCIO project directory --- .../internal/setup_tools/life_cycle/life_cycle_setuper.go | 2 +- playground/backend/new_scio_project.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go index 2c6e3ecd3499..9fa0826b045d 100644 --- a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go +++ b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go @@ -41,7 +41,7 @@ const ( javaLogFilePlaceholder = "{logFilePath}" goModFileName = "go.mod" goSumFileName = "go.sum" - scioProjectName = "y" + scioProjectName = "scio" scioProjectPath = scioProjectName + "/src/main/scala/" + scioProjectName logFileName = "logs.log" defaultExampleInSbt = "WordCount.scala" diff --git a/playground/backend/new_scio_project.sh b/playground/backend/new_scio_project.sh index a01fc78c923c..61b463eefe0d 100755 --- a/playground/backend/new_scio_project.sh +++ b/playground/backend/new_scio_project.sh @@ -15,4 +15,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -yes | sbt new spotify/scio-template.g8 +{ printf scio\\nscio\\n; yes; } | sbt new spotify/scio-template.g8 From 3960bf0ddb652ed53b3338b2068a309c8e5800f7 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Wed, 11 Jan 2023 18:57:29 +0400 Subject: [PATCH 06/17] Run "sbt compile" during container build to fetch all Scala dependencies --- playground/backend/containers/scio/Dockerfile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/playground/backend/containers/scio/Dockerfile b/playground/backend/containers/scio/Dockerfile index 0aa1ff54b8dc..1e759539ea40 100644 --- a/playground/backend/containers/scio/Dockerfile +++ b/playground/backend/containers/scio/Dockerfile @@ -87,7 +87,10 @@ USER appuser RUN mkdir -p /tmp/sbt-initialize WORKDIR /tmp/sbt-initialize RUN /opt/playground/backend/new_scio_project.sh +WORKDIR /tmp/sbt-initialize/scio +RUN sbt "+compile" WORKDIR / +RUN rm -r /tmp/sbt-initialize # Enable mitmproxy ENV HTTP_PROXY="http://127.0.0.1:8081" From e8689bdcecdec5af6a1f7294ca301ed7a69d362d Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Thu, 12 Jan 2023 14:55:14 +0400 Subject: [PATCH 07/17] Disable forking JVM in SBT to significantly reduce memory usage --- playground/backend/new_scio_project.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/playground/backend/new_scio_project.sh b/playground/backend/new_scio_project.sh index 61b463eefe0d..80e031f0fd8c 100755 --- a/playground/backend/new_scio_project.sh +++ b/playground/backend/new_scio_project.sh @@ -16,3 +16,5 @@ # limitations under the License. { printf scio\\nscio\\n; yes; } | sbt new spotify/scio-template.g8 + +echo "Compile / run / fork := false" >> scio/build.sbt From 08473d237b323a71691624a2d4a4df3e8825e654 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Thu, 12 Jan 2023 14:56:41 +0400 Subject: [PATCH 08/17] Impose memory limits on local deployments of SCIO runner container to better imitate real deployments --- playground/docker-compose.local.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/playground/docker-compose.local.yaml b/playground/docker-compose.local.yaml index e202b6a4dca7..92121a2f0c5d 100644 --- a/playground/docker-compose.local.yaml +++ b/playground/docker-compose.local.yaml @@ -91,6 +91,10 @@ services: - "8090:8090" depends_on: - redis + # Impose limits on container to better mimic real deployment environment + mem_limit: 2048M + ulimits: + rss: 1073741824 frontend: image: apache/beam_playground-frontend From eecba19f6c300e5208a040d69c4ad3e64c366c01 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Thu, 12 Jan 2023 15:24:00 +0400 Subject: [PATCH 09/17] Fine-tune Java GC to improve performance and memory usage of SCIO examples --- playground/backend/containers/scio/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/playground/backend/containers/scio/Dockerfile b/playground/backend/containers/scio/Dockerfile index 1e759539ea40..1f20b252109d 100644 --- a/playground/backend/containers/scio/Dockerfile +++ b/playground/backend/containers/scio/Dockerfile @@ -95,5 +95,6 @@ RUN rm -r /tmp/sbt-initialize # Enable mitmproxy ENV HTTP_PROXY="http://127.0.0.1:8081" ENV HTTPS_PROXY="http://127.0.0.1:8081" +ENV SBT_OPTS="-Xmx512M -XX:+UseG1GC -XX:+UseStringDeduplication" ENTRYPOINT ["/entrypoint.sh"] From d64540822321c983203546aef56d44ec45368b15 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Fri, 27 Jan 2023 14:59:00 +0400 Subject: [PATCH 10/17] Remove large blobs of text from common_test.go --- .../code_processing/code_processing.go | 24 ++-- playground/backend/internal/fs_tool/fs.go | 36 +----- .../backend/internal/fs_tool/fs_test.go | 6 +- .../life_cycle/life_cycle_setuper.go | 13 +- .../life_cycle/life_cycle_setuper_test.go | 8 +- playground/backend/internal/utils/common.go | 53 +++++++-- .../backend/internal/utils/common_test.go | 99 ---------------- .../internal/utils/preparers_utils_test.go | 55 +++++++++ .../utils/test_data/JavaFileName.java | 23 ++++ .../internal/utils/test_data/wordcount.py | 111 ++++++++++++++++++ 10 files changed, 263 insertions(+), 165 deletions(-) create mode 100644 playground/backend/internal/utils/test_data/JavaFileName.java create mode 100644 playground/backend/internal/utils/test_data/wordcount.py diff --git a/playground/backend/internal/code_processing/code_processing.go b/playground/backend/internal/code_processing/code_processing.go index b7cfd09e3f83..e380d5d8dd77 100644 --- a/playground/backend/internal/code_processing/code_processing.go +++ b/playground/backend/internal/code_processing/code_processing.go @@ -370,9 +370,12 @@ func runCmdWithOutput(cmd *exec.Cmd, stdOutput io.Writer, stdError io.Writer, su // reconcileBackgroundTask waits when first background task finishes. // If finishes by canceling, timeout or context is done - returns error. // If cmd operation (Validate/Prepare/Compile/Run/RunTest) finishes successfully with no error -// during step processing - returns true. +// +// during step processing - returns true. +// // If cmd operation (Validate/Prepare/Compile/Run/RunTest) finishes successfully but with some error -// during step processing - returns false. +// +// during step processing - returns false. func reconcileBackgroundTask(pipelineLifeCycleCtx, backgroundCtx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChannel, successChannel chan bool) (bool, error) { select { case <-pipelineLifeCycleCtx.Done(): @@ -422,21 +425,21 @@ func readGraphFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheSer case <-ticker.C: if _, err := os.Stat(graphFilePath); err == nil { ticker.Stop() - graph, err := utils.ReadFile(pipelineId, graphFilePath) + graph, err := os.ReadFile(graphFilePath) if err != nil { logger.Errorf("%s: Error during saving graph to the file: %s", pipelineId, err.Error()) } - _ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, graph) + _ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, string(graph)) } // in case of timeout or cancel case <-pipelineLifeCycleCtx.Done(): ticker.Stop() if _, err := os.Stat(graphFilePath); err == nil { - graph, err := utils.ReadFile(pipelineId, graphFilePath) + graph, err := os.ReadFile(graphFilePath) if err != nil { logger.Errorf("%s: Error during saving graph to the file: %s", pipelineId, err.Error()) } - _ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, graph) + _ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, string(graph)) } return } @@ -446,8 +449,10 @@ func readGraphFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheSer // readLogFile reads logs from the log file and keeps it to the cache. // If context is done it means that the code processing was finished (successfully/with error/timeout). Write last logs to the cache. // If <-stopReadLogsChannel it means that the code processing was finished (canceled/timeout) -// and it waits until the method stops the work to change status to the pb.Status_STATUS_FINISHED. Write last logs +// +// and it waits until the method stops the work to change status to the pb.Status_STATUS_FINISHED. Write last logs // to the cache and set value to the finishReadLogChannel channel to unblock the code processing. +// // In other case each pauseDuration write to cache logs of the code processing. func readLogFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID, stopReadLogsChannel, finishReadLogChannel chan bool) { ticker := time.NewTicker(pauseDuration) @@ -476,8 +481,10 @@ func finishReadLogFile(ctx context.Context, ticker *time.Ticker, cacheService ca // writeLogsToCache write all logs from the log file to the cache. // If log file doesn't exist, return nil. +// // Reading logs works as a parallel with code processing so when program tries to read file // it could be that the file doesn't exist yet. +// // If log file exists, read all from the log file and keep it to the cache using cache.Logs subKey. // If some error occurs, log the error and return the error. func writeLogsToCache(ctx context.Context, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID) error { @@ -527,6 +534,7 @@ func processErrorWithSavingOutput(ctx context.Context, err error, errorOutput [] // processRunError processes error received during processing run step. // This method sets error output to the cache and after that sets value to channel to stop goroutine which writes logs. +// // After receiving a signal that goroutine was finished (read value from finishReadLogsChannel) this method // sets corresponding status to the cache. func processRunError(ctx context.Context, errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error { @@ -553,6 +561,7 @@ func processSuccess(ctx context.Context, pipelineId uuid.UUID, cacheService cach // processCompileSuccess processes case after successful compile step. // This method sets output of the compile step, sets empty string as output of the run step and +// // sets corresponding status to the cache. func processCompileSuccess(ctx context.Context, output []byte, pipelineId uuid.UUID, cacheService cache.Cache) error { logger.Infof("%s: Compile() finish\n", pipelineId) @@ -577,6 +586,7 @@ func processCompileSuccess(ctx context.Context, output []byte, pipelineId uuid.U // processRunSuccess processes case after successful run step. // This method sets value to channel to stop goroutine which writes logs. +// // After receiving a signal that goroutine was finished (read value from finishReadLogsChannel) this method // sets corresponding status to the cache. func processRunSuccess(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error { diff --git a/playground/backend/internal/fs_tool/fs.go b/playground/backend/internal/fs_tool/fs.go index 016014d1d40d..2f5376dcb5a8 100644 --- a/playground/backend/internal/fs_tool/fs.go +++ b/playground/backend/internal/fs_tool/fs.go @@ -17,12 +17,9 @@ package fs_tool import ( "fmt" - "io" + "github.com/google/uuid" "io/fs" "os" - "path/filepath" - - "github.com/google/uuid" pb "beam.apache.org/playground/backend/internal/api/v1" "beam.apache.org/playground/backend/internal/db/entity" @@ -112,34 +109,3 @@ func (lc *LifeCycle) CreateSourceCodeFiles(sources []entity.FileEntity) error { } return nil } - -// CopyFile copies a file with fileName from sourceDir to destinationDir. -func (lc *LifeCycle) CopyFile(fileName, sourceDir, destinationDir string) error { - absSourcePath := filepath.Join(sourceDir, fileName) - absDestinationPath := filepath.Join(destinationDir, fileName) - sourceFileStat, err := os.Stat(absSourcePath) - if err != nil { - return err - } - - if !sourceFileStat.Mode().IsRegular() { - return fmt.Errorf("%s is not a regular file", fileName) - } - - sourceFile, err := os.Open(absSourcePath) - if err != nil { - return err - } - defer sourceFile.Close() - - destinationFile, err := os.Create(absDestinationPath) - if err != nil { - return err - } - defer destinationFile.Close() - _, err = io.Copy(destinationFile, sourceFile) - if err != nil { - return err - } - return nil -} diff --git a/playground/backend/internal/fs_tool/fs_test.go b/playground/backend/internal/fs_tool/fs_test.go index 7e4cc36221f7..4241fd9a486c 100644 --- a/playground/backend/internal/fs_tool/fs_test.go +++ b/playground/backend/internal/fs_tool/fs_test.go @@ -135,11 +135,7 @@ func TestLifeCycle_CopyFile(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - l := &LifeCycle{ - folderGlobs: tt.fields.folderGlobs, - Paths: tt.fields.Paths, - } - if err := l.CopyFile(tt.args.fileName, tt.args.sourceDir, tt.args.destinationDir); (err != nil) != tt.wantErr { + if err := utils.CopyFilePreservingName(tt.args.fileName, tt.args.sourceDir, tt.args.destinationDir); (err != nil) != tt.wantErr { t.Errorf("CopyFile() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go index 9fa0826b045d..afdb04284ce9 100644 --- a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go +++ b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go @@ -32,7 +32,7 @@ import ( "beam.apache.org/playground/backend/internal/emulators" "beam.apache.org/playground/backend/internal/fs_tool" "beam.apache.org/playground/backend/internal/logger" - "beam.apache.org/playground/backend/internal/utils" + utils "beam.apache.org/playground/backend/internal/utils" ) const ( @@ -41,6 +41,7 @@ const ( javaLogFilePlaceholder = "{logFilePath}" goModFileName = "go.mod" goSumFileName = "go.sum" + bashCmd = "bash" scioProjectName = "scio" scioProjectPath = scioProjectName + "/src/main/scala/" + scioProjectName logFileName = "logs.log" @@ -110,11 +111,11 @@ func Setup(sdk pb.Sdk, sources []entity.FileEntity, pipelineId uuid.UUID, workin // prepareGoFiles prepares file for Go environment. // Copy go.mod and go.sum file from /path/to/preparedModDir to /path/to/workingDir/pipelinesFolder/{pipelineId} func prepareGoFiles(lc *fs_tool.LifeCycle, preparedModDir string, pipelineId uuid.UUID) error { - if err := lc.CopyFile(goModFileName, preparedModDir, lc.Paths.AbsoluteBaseFolderPath); err != nil { + if err := utils.CopyFilePreservingName(goModFileName, preparedModDir, lc.Paths.AbsoluteBaseFolderPath); err != nil { logger.Errorf("%s: error during copying %s file: %s\n", pipelineId, goModFileName, err.Error()) return err } - if err := lc.CopyFile(goSumFileName, preparedModDir, lc.Paths.AbsoluteBaseFolderPath); err != nil { + if err := utils.CopyFilePreservingName(goSumFileName, preparedModDir, lc.Paths.AbsoluteBaseFolderPath); err != nil { logger.Errorf("%s: error during copying %s file: %s\n", pipelineId, goSumFileName, err.Error()) return err } @@ -126,7 +127,7 @@ func prepareGoFiles(lc *fs_tool.LifeCycle, preparedModDir string, pipelineId uui // // and update this file according to pipeline. func prepareJavaFiles(lc *fs_tool.LifeCycle, workingDir string, pipelineId uuid.UUID) error { - err := lc.CopyFile(javaLogConfigFileName, workingDir, lc.Paths.AbsoluteBaseFolderPath) + err := utils.CopyFilePreservingName(javaLogConfigFileName, workingDir, lc.Paths.AbsoluteBaseFolderPath) if err != nil { logger.Errorf("%s: error during copying logging.properties file: %s\n", pipelineId, err.Error()) return err @@ -176,7 +177,7 @@ func updateJavaLogConfigFile(paths fs_tool.LifeCyclePaths) error { } func prepareSbtFiles(lc *fs_tool.LifeCycle, pipelineFolder string, workingDir string) (*fs_tool.LifeCycle, error) { - cmd := exec.Command(filepath.Join(workingDir, scioProject)) + cmd := exec.Command(bashCmd, filepath.Join(workingDir, scioProject)) cmd.Dir = pipelineFolder _, err := cmd.Output() if err != nil { @@ -197,7 +198,7 @@ func prepareSbtFiles(lc *fs_tool.LifeCycle, pipelineFolder string, workingDir st return lc, err } - err = lc.CopyFile(scioCommonConstants, workingDir, absFileFolderPath) + err = utils.CopyFilePreservingName(scioCommonConstants, workingDir, absFileFolderPath) if err != nil { return lc, err } diff --git a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go index 556b4259c443..3bf20833aee9 100644 --- a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go +++ b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go @@ -284,7 +284,13 @@ func TestSetup(t *testing.T) { pipelinesFolder: pipelinesFolder, }, prep: func() error { - _, err := os.Create(filepath.Join(workingDir, scioCommonConstants)) + sourceScioShFile := "../../../new_scio_project.sh" + scioShFile := filepath.Join(workingDir, scioProject) + err := utils.CopyFile(sourceScioShFile, scioShFile) + if err != nil { + return err + } + _, err = os.Create(filepath.Join(workingDir, scioCommonConstants)) if err != nil { return err } diff --git a/playground/backend/internal/utils/common.go b/playground/backend/internal/utils/common.go index 234be2769312..9720ed5df9e3 100644 --- a/playground/backend/internal/utils/common.go +++ b/playground/backend/internal/utils/common.go @@ -17,9 +17,12 @@ package utils import ( "beam.apache.org/playground/backend/internal/logger" - "github.com/google/uuid" + "fmt" "gopkg.in/yaml.v3" + "io" "io/ioutil" + "os" + "path/filepath" "regexp" ) @@ -28,17 +31,7 @@ func ReduceWhiteSpacesToSinge(s string) string { return re.ReplaceAllString(s, " ") } -//ReadFile reads from file and returns string. -func ReadFile(pipelineId uuid.UUID, path string) (string, error) { - content, err := ioutil.ReadFile(path) - if err != nil { - logger.Errorf("%s: ReadFile(): error during reading from a file: %s", pipelineId, err.Error()) - return "", err - } - return string(content), nil -} - -//ReadYamlFile reads from a yaml file. +// ReadYamlFile reads from a yaml file. func ReadYamlFile(filename string, out interface{}) error { buf, err := ioutil.ReadFile(filename) if err != nil { @@ -51,3 +44,39 @@ func ReadYamlFile(filename string, out interface{}) error { } return nil } + +// CopyFilePreservingName copies a file with fileName from sourceDir to destinationDir. +func CopyFilePreservingName(fileName, sourceDir, destinationDir string) error { + absSourcePath := filepath.Join(sourceDir, fileName) + absDestinationPath := filepath.Join(destinationDir, fileName) + return CopyFile(absSourcePath, absDestinationPath) +} + +// CopyFile copies a file from sourcePath to destinationPath +func CopyFile(sourcePath, destinationPath string) error { + sourceFileStat, err := os.Stat(sourcePath) + if err != nil { + return err + } + + if !sourceFileStat.Mode().IsRegular() { + return fmt.Errorf("%s is not a regular file", sourcePath) + } + + sourceFile, err := os.Open(sourcePath) + if err != nil { + return err + } + defer sourceFile.Close() + + destinationFile, err := os.Create(destinationPath) + if err != nil { + return err + } + defer destinationFile.Close() + _, err = io.Copy(destinationFile, sourceFile) + if err != nil { + return err + } + return nil +} diff --git a/playground/backend/internal/utils/common_test.go b/playground/backend/internal/utils/common_test.go index cc51aa01b4e0..7303582f1010 100644 --- a/playground/backend/internal/utils/common_test.go +++ b/playground/backend/internal/utils/common_test.go @@ -16,64 +16,9 @@ package utils import ( - "fmt" - "github.com/google/uuid" - "os" - "path/filepath" "testing" ) -const ( - sourceDir = "sourceDir" - fileName = "file.txt" - fileContent = "content" - javaFileName = "javaFileName.java" - emptyFileName = "emptyFile.java" - pythonExampleName = "wordCount.py" - wordCountPython = "import argparse\nimport logging\nimport re\n\nimport apache_beam as beam\nfrom apache_beam.io import ReadFromText\nfrom apache_beam.io import WriteToText\nfrom apache_beam.options.pipeline_options import PipelineOptions\nfrom apache_beam.options.pipeline_options import SetupOptions\n\n\nclass WordExtractingDoFn(beam.DoFn):\n \"\"\"Parse each line of input text into words.\"\"\"\n def process(self, element):\n \"\"\"Returns an iterator over the words of this element.\n\n The element is a line of text. If the line is blank, note that, too.\n\n Args:\n element: the element being processed\n\n Returns:\n The processed element.\n \"\"\"\n return re.findall(r'[\\w\\']+', element, re.UNICODE)\n\n\ndef run(argv=None, save_main_session=True):\n \"\"\"Main entry point; defines and runs the wordcount pipeline.\"\"\"\n parser = argparse.ArgumentParser()\n parser.add_argument(\n '--input',\n dest='input',\n default='gs://dataflow-samples/shakespeare/kinglear.txt',\n help='Input file to process.')\n parser.add_argument(\n '--output',\n dest='output',\n required=True,\n help='Output file to write results to.')\n known_args, pipeline_args = parser.parse_known_args(argv)\n\n # We use the save_main_session option because one or more DoFn's in this\n # workflow rely on global context (e.g., a module imported at module level).\n pipeline_options = PipelineOptions(pipeline_args)\n pipeline_options.view_as(SetupOptions).save_main_session = save_main_session\n\n # The pipeline will be run on exiting the with block.\n with beam.Pipeline(options=pipeline_options) as p:\n\n # Read the text file[pattern] into a PCollection.\n lines = p | 'Read' >> ReadFromText(known_args.input)\n\n counts = (\n lines\n | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))\n | 'PairWithOne' >> beam.Map(lambda x: (x, 1))\n | 'GroupAndSum' >> beam.CombinePerKey(sum))\n\n # Format the counts into a PCollection of strings.\n def format_result(word, count):\n return '%s: %d' % (word, count)\n\n output = counts | 'Format' >> beam.MapTuple(format_result)\n\n # Write the output using a \"Write\" transform that has side effects.\n # pylint: disable=expression-not-assigned\n output | 'Write' >> WriteToText(known_args.output)\n\n\nif __name__ == '__main__':\n logging.getLogger().setLevel(logging.INFO)\n run()" - javaCode = "package org.apache.beam.examples;\n\n// beam-playground:\n// name: MinimalWordCount\n// description: An example that counts words in Shakespeare's works.\n// multifile: false\n// default_example: true\n// context_line: 71\n// categories:\n// - Combiners\n// - Filtering\n// - IO\n// - Core Transforms\n// - Quickstart\n\nimport java.util.Arrays;\nimport org.apache.beam.sdk.Pipeline;\nimport org.apache.beam.sdk.io.TextIO;\nimport org.apache.beam.sdk.options.PipelineOptions;\nimport org.apache.beam.sdk.options.PipelineOptionsFactory;\nimport org.apache.beam.sdk.transforms.Count;\nimport org.apache.beam.sdk.transforms.Filter;\nimport org.apache.beam.sdk.transforms.FlatMapElements;\nimport org.apache.beam.sdk.transforms.MapElements;\nimport org.apache.beam.sdk.values.KV;\nimport org.apache.beam.sdk.values.TypeDescriptors;\n\n/**\n * An example that counts words in Shakespeare.\n *\n *

This class, {@link MinimalWordCount}, is the first in a series of four successively more\n * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or\n * argument processing, and focus on construction of the pipeline, which chains together the\n * application of core transforms.\n *\n *

Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the\n * {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional\n * concepts.\n *\n *

Concepts:\n *\n *

\n *   1. Reading data from text files\n *   2. Specifying 'inline' transforms\n *   3. Counting items in a PCollection\n *   4. Writing data to text files\n * 
\n *\n *

No arguments are required to run this pipeline. It will be executed with the DirectRunner. You\n * can see the results in the output files in your current working directory, with names like\n * \"wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate\n * file service.\n */\npublic class MinimalWordCount {\n\n public static void main(String[] args) {\n\n // Create a PipelineOptions object. This object lets us set various execution\n // options for our pipeline, such as the runner you wish to use. This example\n // will run with the DirectRunner by default, based on the class path configured\n // in its dependencies.\n PipelineOptions options = PipelineOptionsFactory.create();\n\n // In order to run your pipeline, you need to make following runner specific changes:\n //\n // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner\n // or FlinkRunner.\n // CHANGE 2/3: Specify runner-required options.\n // For BlockingDataflowRunner, set project and temp location as follows:\n // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);\n // dataflowOptions.setRunner(BlockingDataflowRunner.class);\n // dataflowOptions.setProject(\"SET_YOUR_PROJECT_ID_HERE\");\n // dataflowOptions.setTempLocation(\"gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY\");\n // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}\n // for more details.\n // options.as(FlinkPipelineOptions.class)\n // .setRunner(FlinkRunner.class);\n\n // Create the Pipeline object with the options we defined above\n Pipeline p = Pipeline.create(options);\n\n // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set\n // of input text files. TextIO.Read returns a PCollection where each element is one line from\n // the input text (a set of Shakespeare's texts).\n\n // This example reads from a public dataset containing the text of King Lear.\n p.apply(TextIO.read().from(\"gs://apache-beam-samples/shakespeare/kinglear.txt\"))\n\n // Concept #2: Apply a FlatMapElements transform the PCollection of text lines.\n // This transform splits the lines in PCollection, where each element is an\n // individual word in Shakespeare's collected texts.\n .apply(\n FlatMapElements.into(TypeDescriptors.strings())\n .via((String line) -> Arrays.asList(line.split(\"[^\\\\p{L}]+\"))))\n // We use a Filter transform to avoid empty word\n .apply(Filter.by((String word) -> !word.isEmpty()))\n // Concept #3: Apply the Count transform to our PCollection of individual words. The Count\n // transform returns a new PCollection of key/value pairs, where each key represents a\n // unique word in the text. The associated value is the occurrence count for that word.\n .apply(Count.perElement())\n // Apply a MapElements transform that formats our PCollection of word counts into a\n // printable string, suitable for writing to an output file.\n .apply(\n MapElements.into(TypeDescriptors.strings())\n .via(\n (KV wordCount) ->\n wordCount.getKey() + \": \" + wordCount.getValue()))\n // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.\n // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of\n // formatted strings) to a series of text files.\n //\n // By default, it will write to a set of files with names like wordcounts-00001-of-00005\n .apply(TextIO.write().to(\"wordcounts\"));\n\n p.run().waitUntilFinish();\n }\n}" - filePermission = 0600 - fullPermission = 0755 -) - -func TestMain(m *testing.M) { - err := setup() - if err != nil { - panic(fmt.Errorf("error during test setup: %s", err.Error())) - } - defer teardown() - m.Run() -} - -func setup() error { - err := os.Mkdir(sourceDir, fullPermission) - if err != nil { - return err - } - filePath := filepath.Join(sourceDir, fileName) - err = os.WriteFile(filePath, []byte(fileContent), filePermission) - if err != nil { - return err - } - javaFilePath := filepath.Join(sourceDir, javaFileName) - err = os.WriteFile(javaFilePath, []byte(javaCode), filePermission) - if err != nil { - return err - } - emptyFilePath := filepath.Join(sourceDir, emptyFileName) - err = os.WriteFile(emptyFilePath, []byte(""), filePermission) - if err != nil { - return err - } - wordCountPythonPath := filepath.Join(sourceDir, pythonExampleName) - err = os.WriteFile(wordCountPythonPath, []byte(wordCountPython), filePermission) - return err -} - -func teardown() error { - return os.RemoveAll(sourceDir) -} - func TestReduceWhiteSpacesToSinge(t *testing.T) { type args struct { s string @@ -94,47 +39,3 @@ func TestReduceWhiteSpacesToSinge(t *testing.T) { }) } } - -func TestReadFile(t *testing.T) { - type args struct { - pipelineId uuid.UUID - path string - } - tests := []struct { - name string - args args - want string - wantErr bool - }{ - { - name: "Read from existing file", - args: args{ - pipelineId: uuid.New(), - path: filepath.Join(sourceDir, fileName), - }, - want: fileContent, - wantErr: false, - }, - { - name: "Read from non-existent file", - args: args{ - pipelineId: uuid.New(), - path: filepath.Join(sourceDir, "non-existent_file.txt"), - }, - want: "", - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := ReadFile(tt.args.pipelineId, tt.args.path) - if (err != nil) != tt.wantErr { - t.Errorf("ReadFile() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("ReadFile() got = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/playground/backend/internal/utils/preparers_utils_test.go b/playground/backend/internal/utils/preparers_utils_test.go index ae40021d7049..e2f5d7a9affb 100644 --- a/playground/backend/internal/utils/preparers_utils_test.go +++ b/playground/backend/internal/utils/preparers_utils_test.go @@ -27,6 +27,61 @@ import ( "testing" ) +const ( + sourceDir = "sourceDir" + fileName = "file.txt" + fileContent = "content" + testDataDir = "test_data" + javaFileName = "JavaFileName.java" + emptyFileName = "emptyFile.java" + pythonExampleName = "wordcount.py" + filePermission = 0600 + fullPermission = 0755 +) + +func TestMain(m *testing.M) { + err := setup() + if err != nil { + panic(fmt.Errorf("error during test setup: %s", err.Error())) + } + defer teardown() + m.Run() +} + +func setup() error { + err := os.Mkdir(sourceDir, fullPermission) + if err != nil { + return err + } + filePath := filepath.Join(sourceDir, fileName) + err = os.WriteFile(filePath, []byte(fileContent), filePermission) + if err != nil { + return err + } + sourceJavaFilePath := filepath.Join(testDataDir, javaFileName) + javaFilePath := filepath.Join(sourceDir, javaFileName) + err = CopyFile(sourceJavaFilePath, javaFilePath) + if err != nil { + return err + } + if err != nil { + return err + } + emptyFilePath := filepath.Join(sourceDir, emptyFileName) + err = os.WriteFile(emptyFilePath, []byte(""), filePermission) + if err != nil { + return err + } + sourceWordCountPythonPath := filepath.Join(testDataDir, pythonExampleName) + wordCountPythonPath := filepath.Join(sourceDir, pythonExampleName) + err = CopyFile(sourceWordCountPythonPath, wordCountPythonPath) + return err +} + +func teardown() error { + return os.RemoveAll(sourceDir) +} + func TestSpacesToEqualsOption(t *testing.T) { type args struct { pipelineOptions string diff --git a/playground/backend/internal/utils/test_data/JavaFileName.java b/playground/backend/internal/utils/test_data/JavaFileName.java new file mode 100644 index 000000000000..9eb8e0e54cf6 --- /dev/null +++ b/playground/backend/internal/utils/test_data/JavaFileName.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +public class MinimalWordCount { + public static void main(String[] args) { + } +} diff --git a/playground/backend/internal/utils/test_data/wordcount.py b/playground/backend/internal/utils/test_data/wordcount.py new file mode 100644 index 000000000000..f4e32c902b0b --- /dev/null +++ b/playground/backend/internal/utils/test_data/wordcount.py @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A word-counting workflow.""" + +# pytype: skip-file + +# beam-playground: +# name: WordCount +# description: An example that counts words in Shakespeare's works. +# multifile: false +# pipeline_options: --output output.txt +# context_line: 44 +# categories: +# - Combiners +# - Options +# - Quickstart +# complexity: MEDIUM +# tags: +# - options +# - count +# - combine +# - strings + +import argparse +import logging +import re + +import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToText +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions + + +class WordExtractingDoFn(beam.DoFn): + """Parse each line of input text into words.""" + def process(self, element): + """Returns an iterator over the words of this element. + + The element is a line of text. If the line is blank, note that, too. + + Args: + element: the element being processed + + Returns: + The processed element. + """ + return re.findall(r'[\w\']+', element, re.UNICODE) + + +def run(argv=None, save_main_session=True): + """Main entry point; defines and runs the wordcount pipeline.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + dest='input', + default='gs://dataflow-samples/shakespeare/kinglear.txt', + help='Input file to process.') + parser.add_argument( + '--output', + dest='output', + required=True, + help='Output file to write results to.') + known_args, pipeline_args = parser.parse_known_args(argv) + + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + # The pipeline will be run on exiting the with block. + with beam.Pipeline(options=pipeline_options) as p: + + # Read the text file[pattern] into a PCollection. + lines = p | 'Read' >> ReadFromText(known_args.input) + + counts = ( + lines + | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) + | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) + | 'GroupAndSum' >> beam.CombinePerKey(sum)) + + # Format the counts into a PCollection of strings. + def format_result(word, count): + return '%s: %d' % (word, count) + + output = counts | 'Format' >> beam.MapTuple(format_result) + + # Write the output using a "Write" transform that has side effects. + # pylint: disable=expression-not-assigned + output | 'Write' >> WriteToText(known_args.output) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() From fd908d2c8c13c9714627724ed689e29b40a2ab43 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Wed, 8 Feb 2023 13:14:09 +0400 Subject: [PATCH 11/17] Add `sbt` to the list of development dependencies --- playground/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/playground/README.md b/playground/README.md index e92effe1697a..49ef792daacf 100644 --- a/playground/README.md +++ b/playground/README.md @@ -36,6 +36,7 @@ The following requirements are needed for development, testing, and deploying. - [gcloud CLI](https://cloud.google.com/sdk/docs/install) - [gcloud Beta Commands](https://cloud.google.com/sdk/gcloud/reference/components/install) - [Cloud Datastore Emulator](https://cloud.google.com/sdk/gcloud/reference/components/install) +- [sbt](https://www.scala-sbt.org/) # Available Gradle Tasks From c8900dfa2a04c76ca538535bdb2e355d001ccc64 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Thu, 16 Feb 2023 16:51:43 +0400 Subject: [PATCH 12/17] Clarify running of backend tests in Playground --- playground/README.md | 34 ++++++++++++++++++++++++++++++++- playground/backend/README.md | 37 +++++++++++++++++++++++++++++------- 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/playground/README.md b/playground/README.md index 49ef792daacf..b4a0645becd7 100644 --- a/playground/README.md +++ b/playground/README.md @@ -35,12 +35,44 @@ The following requirements are needed for development, testing, and deploying. - [Docker Compose](https://docs.docker.com/compose/install/) - [gcloud CLI](https://cloud.google.com/sdk/docs/install) - [gcloud Beta Commands](https://cloud.google.com/sdk/gcloud/reference/components/install) -- [Cloud Datastore Emulator](https://cloud.google.com/sdk/gcloud/reference/components/install) +- [Cloud Datastore Emulator](https://cloud.google.com/datastore/docs/tools/datastore-emulator) - [sbt](https://www.scala-sbt.org/) +### Google Cloud Shell Prerequisites Installation +Google Cloud Shell already has most of the prerequisites installed. Only few tools need to be installed separately + +#### Flutter +```shell +git config --global --add safe.directory /google/flutter +flutter doctor +``` + +#### Protobuf +```shell +go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28 +go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2 +dart pub global activate protoc_plugin +npm install -g @bufbuild/buf +``` +#### sbt +```shell +echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list +echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list +curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import +sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg +sudo apt-get update +sudo apt-get install sbt +``` +### Additional tools +Google Cloud shell machines do not have `netcat` and `lsof` preinstalled. Install them using: +```shell +sudo apt install netcat lsof +``` + # Available Gradle Tasks ## Perform overall pre-commit checks +> **Google Cloud Shell note:** run `unset GOOGLE_CLOUD_PROJECT` before running tests so they would use locally running datastore emulator. ``` cd beam diff --git a/playground/backend/README.md b/playground/backend/README.md index 1b3ad66f2b42..d882095ec090 100644 --- a/playground/backend/README.md +++ b/playground/backend/README.md @@ -27,7 +27,7 @@ no setup. ## Getting Started -See [playground/README.md](../README.md) for details on requirements and setup. +See [playground/README.md](../README.md) for details on installing development dependencies. This section describes what is needed to run the backend application. @@ -35,8 +35,25 @@ This section describes what is needed to run the backend application. - Set up environment variables to run the backend locally - Running the backend via Docker -### Go commands to run/test application locally +## Go commands to run/test application locally +### Prerequisite + +> **Google Cloud Shell note:** `start_datastore_emulator.sh` script makes use of `nc` and `lsof` commands which are not installed on Google Cloud Shell machines. You can install them using `sudo apt install netcat lsof`. + +> **Google Cloud Shell note:** run `unset GOOGLE_CLOUD_PROJECT` before running tests so they would use locally running datastore emulator. + +Start datastore emulator +```shell +$ bash start_datastore_emulator.sh +``` + +After you have finished running tests +```shell +$ bash stop_datastore_emulator.sh +``` + +### Run/build Go to the backend directory: ```shell @@ -46,7 +63,12 @@ $ cd backend The following command is used to build and serve the backend locally: ```shell -$ go run ./cmd/server/server.go +DATASTORE_EMULATOR_HOST=127.0.0.1:8888 \ +DATASTORE_PROJECT_ID=test \ +SDK_CONFIG=../sdks-emulator.yaml \ +BEAM_SDK="SDK_UNSPECIFIED" \ +APP_WORK_DIR= \ +go run ./cmd/server ``` Run the following command to generate a release build file: @@ -55,15 +77,16 @@ Run the following command to generate a release build file: $ go build ./cmd/server/server.go ``` +### Test Playground tests may be run using this command: ```shell -$ go test ... -v +$ go test ./... -v ``` The full list of commands can be found [here](https://pkg.go.dev/cmd/go). -### Set up environment variables to run the backend locally +## Set up environment variables to run the backend locally These environment variables should be set to run the backend locally: @@ -96,7 +119,7 @@ default value and there is no need to set them up to launch locally: - `PROPERTY_PATH` - is the application properties path (default value = `.`) - `CACHE_REQUEST_TIMEOUT` - is the timeout to request data from cache (default value = `5 sec`) -### Application properties +## Application properties These properties are stored in `backend/properties.yaml` file: @@ -106,7 +129,7 @@ These properties are stored in `backend/properties.yaml` file: - `removing_unused_snippets_cron` - is the cron expression for the scheduled task to remove unused snippets. - `removing_unused_snippets_days` - is the number of days after which a snippet becomes unused. -### Running the server app via Docker +## Running the server app via Docker To run the server using Docker images there are `Docker` files in the `containers` folder for Java, Python and Go languages. Each of them processes the corresponding SDK, so the backend with Go SDK will work with Go From e12764712a50fd4da676c23f6f6fbb33344a40d9 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Fri, 17 Feb 2023 18:03:57 +0400 Subject: [PATCH 13/17] Clarify local running of backend --- playground/backend/README.md | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/playground/backend/README.md b/playground/backend/README.md index d882095ec090..916127dc4f47 100644 --- a/playground/backend/README.md +++ b/playground/backend/README.md @@ -60,17 +60,35 @@ Go to the backend directory: $ cd backend ``` -The following command is used to build and serve the backend locally: +To run backend server on development machine without using docker you'll need first to prepare a working directory anywhere outside of Beam source tree: +```shell +mkdir ~/path/to/workdir +``` +and then copy `datasets/` and `configs/` and `logging.properties` from [`playground/backend/`](/playground/backend/) directory: +```shell +cp -r {logging.properties,datasets/,configs/} ~/path/to/workdir +``` +In case if you want to start backend for Go SDK you additionally will also need to create a prepared mod dir and export an additional environment variable: ```shell +export PREPARED_MOD_DIR=~/path/to/workdir/prepared_folder +SDK_TAG=2.44.0 bash ./containers/go/setup_sdk.sh $PREPARED_MOD_DIR +``` + +The following command will build and serve the backend locally: + +```shell +SERVER_PORT= \ +BEAM_SDK= \ +APP_WORK_DIR= \ DATASTORE_EMULATOR_HOST=127.0.0.1:8888 \ DATASTORE_PROJECT_ID=test \ SDK_CONFIG=../sdks-emulator.yaml \ -BEAM_SDK="SDK_UNSPECIFIED" \ -APP_WORK_DIR= \ go run ./cmd/server ``` +where `` should be the value of port on which you want to have the backend server availalbe; `` is a value of desired Beam SDK, possible values are `SDK_UNSPECIFIED`, `SDK_JAVA`, `SDK_PYTHON`, `SDK_GO`, `SDK_SCIO`; `` should be set to path to yoru work dir, e.g. `~/path/to/workdir`. + Run the following command to generate a release build file: ```shell From 5e8a2de73b9fc19d25d8c2f7c9b239fcd81b48ef Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Fri, 17 Feb 2023 18:28:45 +0400 Subject: [PATCH 14/17] Improve consistency in code blocks in backend Readme --- playground/backend/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/playground/backend/README.md b/playground/backend/README.md index 916127dc4f47..94209c3a65de 100644 --- a/playground/backend/README.md +++ b/playground/backend/README.md @@ -45,19 +45,19 @@ This section describes what is needed to run the backend application. Start datastore emulator ```shell -$ bash start_datastore_emulator.sh +bash start_datastore_emulator.sh ``` After you have finished running tests ```shell -$ bash stop_datastore_emulator.sh +bash stop_datastore_emulator.sh ``` ### Run/build Go to the backend directory: ```shell -$ cd backend +cd backend ``` To run backend server on development machine without using docker you'll need first to prepare a working directory anywhere outside of Beam source tree: @@ -92,14 +92,14 @@ where `` should be the value of port on which you want to have the backend Run the following command to generate a release build file: ```shell -$ go build ./cmd/server/server.go +go build ./cmd/server/server.go ``` ### Test Playground tests may be run using this command: ```shell -$ go test ./... -v +go test ./... -v ``` The full list of commands can be found [here](https://pkg.go.dev/cmd/go). From 4f68a76f4507a6ce7c9ef841caac0c0283fb863c Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Tue, 21 Feb 2023 13:06:32 +0400 Subject: [PATCH 15/17] Fixing trailing whitespace --- playground/backend/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/playground/backend/README.md b/playground/backend/README.md index 94209c3a65de..db387abab32e 100644 --- a/playground/backend/README.md +++ b/playground/backend/README.md @@ -79,7 +79,7 @@ The following command will build and serve the backend locally: ```shell SERVER_PORT= \ -BEAM_SDK= \ +BEAM_SDK= \ APP_WORK_DIR= \ DATASTORE_EMULATOR_HOST=127.0.0.1:8888 \ DATASTORE_PROJECT_ID=test \ From 36e42cf30ea5b4d037b59c0b296715e1e265a0b8 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Wed, 22 Mar 2023 23:55:12 +0400 Subject: [PATCH 16/17] Update playground/backend/README.md Co-authored-by: Danny McCormick --- playground/backend/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/playground/backend/README.md b/playground/backend/README.md index db387abab32e..6659e948d754 100644 --- a/playground/backend/README.md +++ b/playground/backend/README.md @@ -87,7 +87,7 @@ SDK_CONFIG=../sdks-emulator.yaml \ go run ./cmd/server ``` -where `` should be the value of port on which you want to have the backend server availalbe; `` is a value of desired Beam SDK, possible values are `SDK_UNSPECIFIED`, `SDK_JAVA`, `SDK_PYTHON`, `SDK_GO`, `SDK_SCIO`; `` should be set to path to yoru work dir, e.g. `~/path/to/workdir`. +where `` should be the value of port on which you want to have the backend server available; `` is a value of desired Beam SDK, possible values are `SDK_UNSPECIFIED`, `SDK_JAVA`, `SDK_PYTHON`, `SDK_GO`, `SDK_SCIO`; `` should be set to path to your work dir, e.g. `~/path/to/workdir`. Run the following command to generate a release build file: From f9dbb51128143dfe1f53c35f3f43d094165e60f4 Mon Sep 17 00:00:00 2001 From: Timur Sultanov Date: Wed, 22 Mar 2023 23:55:22 +0400 Subject: [PATCH 17/17] Update playground/backend/internal/utils/preparers_utils_test.go Co-authored-by: Danny McCormick --- playground/backend/internal/utils/preparers_utils_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/playground/backend/internal/utils/preparers_utils_test.go b/playground/backend/internal/utils/preparers_utils_test.go index e2f5d7a9affb..72f0eab5cb41 100644 --- a/playground/backend/internal/utils/preparers_utils_test.go +++ b/playground/backend/internal/utils/preparers_utils_test.go @@ -64,9 +64,6 @@ func setup() error { if err != nil { return err } - if err != nil { - return err - } emptyFilePath := filepath.Join(sourceDir, emptyFileName) err = os.WriteFile(emptyFilePath, []byte(""), filePermission) if err != nil {