diff --git a/playground/infrastructure/cd_helper.py b/playground/infrastructure/cd_helper.py index c90f02abd3b5..06e5b473a376 100644 --- a/playground/infrastructure/cd_helper.py +++ b/playground/infrastructure/cd_helper.py @@ -26,6 +26,7 @@ from pathlib import Path from typing import List +from tqdm import tqdm from google.cloud import storage from api.v1.api_pb2 import Sdk @@ -40,12 +41,18 @@ class CDHelper: It is used to save beam examples/katas/tests and their output on the GCS. """ + def store_examples(self, examples: List[Example]): """ Store beam examples and their output in the Google Cloud. """ + logging.info("Start of executing Playground examples ...") asyncio.run(self._get_outputs(examples)) + logging.info("Finish of executing Playground examples") + + logging.info("Start of sending Playground examples to the bucket ...") self._save_to_cloud_storage(examples) + logging.info("Finish of sending Playground examples to the bucket") self._clear_temp_folder() async def _get_outputs(self, examples: List[Example]): @@ -71,7 +78,7 @@ async def _get_outputs(self, examples: List[Example]): example.output = output for log, example in zip(logs, examples): - example.logs = log + example.logs = log def _save_to_cloud_storage(self, examples: List[Example]): """ @@ -82,7 +89,7 @@ def _save_to_cloud_storage(self, examples: List[Example]): """ self._storage_client = storage.Client() self._bucket = self._storage_client.bucket(Config.BUCKET_NAME) - for example in examples: + for example in tqdm(examples): file_names = self._write_to_local_fs(example) for cloud_file_name, local_file_name in file_names.items(): self._upload_blob( @@ -117,10 +124,10 @@ def _write_to_local_fs(self, example: Example): file_name=example.tag.name, extension=PrecompiledExample.OUTPUT_EXTENSION) log_path = self._get_gcs_object_name( - sdk=example.sdk, - base_folder_name=example.tag.name, - file_name=example.tag.name, - extension=PrecompiledExample.LOG_EXTENSION) + sdk=example.sdk, + base_folder_name=example.tag.name, + file_name=example.tag.name, + extension=PrecompiledExample.LOG_EXTENSION) meta_path = self._get_gcs_object_name( sdk=example.sdk, base_folder_name=example.tag.name, diff --git a/playground/infrastructure/ci_cd.py b/playground/infrastructure/ci_cd.py index f6d83ab8d6f2..1d94e0c2b565 100644 --- a/playground/infrastructure/ci_cd.py +++ b/playground/infrastructure/ci_cd.py @@ -18,6 +18,7 @@ """ import argparse import asyncio +import logging import os from typing import List @@ -77,11 +78,19 @@ def _check_envs(): def _run_ci_cd(step: config.Config.CI_CD_LITERAL, sdk: Sdk): supported_categories = get_supported_categories(categories_file) + logging.info("Start of searching Playground examples ...") examples = find_examples(root_dir, supported_categories, sdk) + logging.info("Finish of searching Playground examples") + logging.info("Number of found Playground examples: %s", len(examples)) + if step == config.Config.CI_STEP_NAME: + logging.info("Start of verification Playground examples ...") _ci_step(examples=examples) + logging.info("Finish of verification Playground examples") if step == config.Config.CD_STEP_NAME: + logging.info("Start of storing Playground examples ...") _cd_step(examples=examples) + logging.info("Finish of storing Playground examples") if __name__ == "__main__": diff --git a/playground/infrastructure/ci_helper.py b/playground/infrastructure/ci_helper.py index 71cf71cda567..08677c2bd286 100644 --- a/playground/infrastructure/ci_helper.py +++ b/playground/infrastructure/ci_helper.py @@ -36,6 +36,7 @@ class CIHelper: It is used to find and verify correctness if beam examples/katas/tests. """ + async def verify_examples(self, examples: List[Example]): """ Verify correctness of beam examples. @@ -62,10 +63,12 @@ async def _verify_examples_status(self, examples: List[Example]): Args: examples: beam examples that should be verified """ + count_of_verified = 0 client = GRPCClient() verify_failed = False for example in examples: if example.status not in Config.ERROR_STATUSES: + count_of_verified += 1 continue if example.status == STATUS_VALIDATION_ERROR: logging.error("Example: %s has validation error", example.filepath) @@ -77,13 +80,22 @@ async def _verify_examples_status(self, examples: List[Example]): elif example.status == STATUS_RUN_TIMEOUT: logging.error("Example: %s failed because of timeout", example.filepath) elif example.status == STATUS_COMPILE_ERROR: - err = await client.get_compile_output(example.filepath) + err = await client.get_compile_output(example.pipeline_id) logging.error( "Example: %s has compilation error: %s", example.filepath, err) elif example.status == STATUS_RUN_ERROR: - err = await client.get_run_error(example.filepath) + err = await client.get_run_error(example.pipeline_id) logging.error( "Example: %s has execution error: %s", example.filepath, err) verify_failed = True + + logging.info( + "Number of verified Playground examples: %s / %s", + count_of_verified, + len(examples)) + logging.info( + "Number of Playground examples with some error: %s / %s", + len(examples) - count_of_verified, + len(examples)) if verify_failed: raise Exception("CI step failed due to errors in the examples") diff --git a/playground/infrastructure/grpc_client.py b/playground/infrastructure/grpc_client.py index bc6026c0c95c..4f9496206c7a 100644 --- a/playground/infrastructure/grpc_client.py +++ b/playground/infrastructure/grpc_client.py @@ -27,6 +27,7 @@ class GRPCClient: """GRPCClient is gRPC client for sending a request to the backend.""" + def __init__(self): self._channel = grpc.aio.insecure_channel(Config.SERVER_ADDRESS) self._stub = api_pb2_grpc.PlaygroundServiceStub(self._channel) diff --git a/playground/infrastructure/helper.py b/playground/infrastructure/helper.py index 551ef8d6b74c..12b0699774bf 100644 --- a/playground/infrastructure/helper.py +++ b/playground/infrastructure/helper.py @@ -22,8 +22,9 @@ import os from collections import namedtuple from dataclasses import dataclass, fields -from typing import List, Optional, Dict, Union +from typing import List, Optional, Dict +from tqdm.asyncio import tqdm import yaml from yaml import YAMLError @@ -128,7 +129,7 @@ async def get_statuses(examples: List[Example]): client = GRPCClient() for example in examples: tasks.append(_update_example_status(example, client)) - await asyncio.gather(*tasks) + await tqdm.gather(*tasks) def get_tag(filepath) -> Optional[ExampleTag]: @@ -150,7 +151,8 @@ def get_tag(filepath) -> Optional[ExampleTag]: lines = parsed_file.readlines() for line in lines: - formatted_line = line.replace("//", "").replace("#", "") + formatted_line = line.replace("//", "").replace("#", + "").replace("\t", " ") if add_to_yaml is False: if formatted_line.lstrip() == Config.BEAM_PLAYGROUND_TITLE: add_to_yaml = True @@ -221,8 +223,7 @@ def get_supported_categories(categories_path: str) -> List[str]: return yaml_object[TagFields.categories] -def _get_example( - filepath: str, filename: str, tag: ExampleTag) -> Example: +def _get_example(filepath: str, filename: str, tag: ExampleTag) -> Example: """ Return an Example by filepath and filename. @@ -278,14 +279,15 @@ def _validate(tag: dict, supported_categories: List[str]) -> bool: field.default, tag.__str__()) valid = False - - value = tag.get(field.default) - if value == "" or value is None: - logging.error( - "tag's value is incorrect: %s\nvalue for %s field can not be empty.", - tag.__str__(), - field.default.__str__()) - valid = False + if valid is True: + value = tag.get(field.default) + if (value == "" or + value is None) and field.default != TagFields.pipeline_options: + logging.error( + "tag's value is incorrect: %s\n%s field can not be empty.", + tag.__str__(), + field.default.__str__()) + valid = False if valid is False: return valid @@ -353,7 +355,7 @@ async def _update_example_status(example: Example, client: GRPCClient): client: client to send requests to the server. """ pipeline_id = await client.run_code( - example.code, example.sdk, example.tag[TagFields.pipeline_options]) + example.code, example.sdk, example.tag.pipeline_options) example.pipeline_id = pipeline_id status = await client.check_status(pipeline_id) while status in [STATUS_VALIDATING, diff --git a/playground/infrastructure/requirements.txt b/playground/infrastructure/requirements.txt index c9aa1cdd81ca..fb09e8beecfa 100644 --- a/playground/infrastructure/requirements.txt +++ b/playground/infrastructure/requirements.txt @@ -22,4 +22,5 @@ protobuf==3.19.1 pytest==6.2.5 pytest-mock==3.6.1 PyYAML==6.0 -google-cloud-storage==1.43.0 \ No newline at end of file +google-cloud-storage==1.43.0 +tqdm~=4.62.3 \ No newline at end of file diff --git a/playground/infrastructure/test_cd_helper.py b/playground/infrastructure/test_cd_helper.py index 91f8d720951e..39e51b815b2c 100644 --- a/playground/infrastructure/test_cd_helper.py +++ b/playground/infrastructure/test_cd_helper.py @@ -58,9 +58,11 @@ def test__get_gcs_object_name(): def test__write_to_local_fs(delete_temp_folder): """ - Test writing code of an example, output and meta info to the filesystem (in temp folder) + Test writing code of an example, output and meta info to + the filesystem (in temp folder) Args: - delete_temp_folder: python fixture to clean up temp folder after method execution + delete_temp_folder: python fixture to clean up temp folder + after method execution """ object_meta = { "name": "name", diff --git a/playground/infrastructure/test_grpc_client.py b/playground/infrastructure/test_grpc_client.py index c7a795165cbd..113cbb23251c 100644 --- a/playground/infrastructure/test_grpc_client.py +++ b/playground/infrastructure/test_grpc_client.py @@ -59,6 +59,7 @@ def mock_get_compile_output(mocker): class TestGRPCClient: + @pytest.mark.asyncio async def test_run_code(self, mock_run_code): result = await GRPCClient().run_code("", api_pb2.SDK_GO, "") diff --git a/playground/infrastructure/test_helper.py b/playground/infrastructure/test_helper.py index f2c69d392be3..29abca1bf1b1 100644 --- a/playground/infrastructure/test_helper.py +++ b/playground/infrastructure/test_helper.py @@ -20,7 +20,7 @@ from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, \ STATUS_VALIDATING, \ - STATUS_FINISHED, SDK_JAVA, SDK_PYTHON, SDK_GO, \ + STATUS_FINISHED, SDK_JAVA, \ PRECOMPILED_OBJECT_TYPE_EXAMPLE, PRECOMPILED_OBJECT_TYPE_KATA, \ PRECOMPILED_OBJECT_TYPE_UNIT_TEST from grpc_client import GRPCClient @@ -38,7 +38,7 @@ def test_find_examples_with_valid_tag(mock_os_walk, mock_check_file): sdk = SDK_UNSPECIFIED result = find_examples(work_dir="", supported_categories=[], sdk=sdk) - assert result == [] + assert not result mock_os_walk.assert_called_once_with("") mock_check_file.assert_called_once_with( examples=[], @@ -56,9 +56,8 @@ def test_find_examples_with_invalid_tag(mock_os_walk, mock_check_file): sdk = SDK_UNSPECIFIED with pytest.raises( ValueError, - match= - "Some of the beam examples contain beam playground tag with an incorrect format" - ): + match="Some of the beam examples contain beam playground tag with " + "an incorrect format"): find_examples("", [], sdk=sdk) mock_os_walk.assert_called_once_with("") @@ -171,22 +170,16 @@ def test_get_supported_categories(): @mock.patch("helper._get_name") def test__get_example(mock_get_name): mock_get_name.return_value = "filepath" - tag = ExampleTag( - { - "name": "Name", - "description": "Description", - "multifile": "False", - "categories": [""], - "pipeline_options": "--option option" - }, - "" - ) - - result = _get_example( - "/root/filepath.java", - "filepath.java", - tag - ) + tag = ExampleTag({ + "name": "Name", + "description": "Description", + "multifile": "False", + "categories": [""], + "pipeline_options": "--option option" + }, + "") + + result = _get_example("/root/filepath.java", "filepath.java", tag) assert result == Example( name="filepath", @@ -284,7 +277,8 @@ async def test__update_example_status( assert example.pipeline_id == "pipeline_id" assert example.status == STATUS_FINISHED - mock_grpc_client_run_code.assert_called_once_with(example.code, example.sdk, "--key value") + mock_grpc_client_run_code.assert_called_once_with( + example.code, example.sdk, "--key value") mock_grpc_client_check_status.assert_has_calls([mock.call("pipeline_id")])