From cbb05f5d3e7a64478b21b15967f66ecd4d9aab3e Mon Sep 17 00:00:00 2001 From: AydarZaynutdinov Date: Tue, 11 Jan 2022 21:36:58 +0300 Subject: [PATCH 1/6] [BEAM-13544][Playground] Add logs for the CI/CD steps Fix minor issues --- playground/infrastructure/cd_helper.py | 10 ++++++++++ playground/infrastructure/ci_cd.py | 9 +++++++++ playground/infrastructure/ci_helper.py | 12 ++++++++---- playground/infrastructure/helper.py | 23 ++++++++++++----------- 4 files changed, 39 insertions(+), 15 deletions(-) diff --git a/playground/infrastructure/cd_helper.py b/playground/infrastructure/cd_helper.py index c90f02abd3b5..af0c5acfde16 100644 --- a/playground/infrastructure/cd_helper.py +++ b/playground/infrastructure/cd_helper.py @@ -26,8 +26,10 @@ from pathlib import Path from typing import List +import tqdm from google.cloud import storage +import logger from api.v1.api_pb2 import Sdk from config import Config, PrecompiledExample from grpc_client import GRPCClient @@ -44,8 +46,13 @@ def store_examples(self, examples: List[Example]): """ Store beam examples and their output in the Google Cloud. """ + logger.info("Start of executing Playground examples ...") asyncio.run(self._get_outputs(examples)) + logger.info("Start of executing Playground examples ...") + + logger.info("Start of sending Playground examples to the bucket ...") self._save_to_cloud_storage(examples) + logger.info("Finish of sending Playground examples to the bucket") self._clear_temp_folder() async def _get_outputs(self, examples: List[Example]): @@ -82,11 +89,14 @@ def _save_to_cloud_storage(self, examples: List[Example]): """ self._storage_client = storage.Client() self._bucket = self._storage_client.bucket(Config.BUCKET_NAME) + pbar = tqdm(total=len(examples)) for example in examples: file_names = self._write_to_local_fs(example) for cloud_file_name, local_file_name in file_names.items(): self._upload_blob( source_file=local_file_name, destination_blob_name=cloud_file_name) + pbar.update(1) + pbar.close() def _write_to_local_fs(self, example: Example): """ diff --git a/playground/infrastructure/ci_cd.py b/playground/infrastructure/ci_cd.py index f6d83ab8d6f2..4aaf5ef2359c 100644 --- a/playground/infrastructure/ci_cd.py +++ b/playground/infrastructure/ci_cd.py @@ -18,6 +18,7 @@ """ import argparse import asyncio +import logging import os from typing import List @@ -77,11 +78,19 @@ def _check_envs(): def _run_ci_cd(step: config.Config.CI_CD_LITERAL, sdk: Sdk): supported_categories = get_supported_categories(categories_file) + logging.info("Start of searching Playground examples ...") examples = find_examples(root_dir, supported_categories, sdk) + logging.info("Finish of searching Playground examples") + logging.info("Count of found Playground examples: %s", len(examples)) + if step == config.Config.CI_STEP_NAME: + logging.info("Start of verification Playground examples ...") _ci_step(examples=examples) + logging.info("Finish of verification Playground examples") if step == config.Config.CD_STEP_NAME: + logging.info("Start of storing Playground examples ...") _cd_step(examples=examples) + logging.info("Finish of storing Playground examples") if __name__ == "__main__": diff --git a/playground/infrastructure/ci_helper.py b/playground/infrastructure/ci_helper.py index 71cf71cda567..26cb29df5db7 100644 --- a/playground/infrastructure/ci_helper.py +++ b/playground/infrastructure/ci_helper.py @@ -62,10 +62,12 @@ async def _verify_examples_status(self, examples: List[Example]): Args: examples: beam examples that should be verified """ + count_of_verified = 0 client = GRPCClient() verify_failed = False for example in examples: if example.status not in Config.ERROR_STATUSES: + count_of_verified += 1 continue if example.status == STATUS_VALIDATION_ERROR: logging.error("Example: %s has validation error", example.filepath) @@ -77,13 +79,15 @@ async def _verify_examples_status(self, examples: List[Example]): elif example.status == STATUS_RUN_TIMEOUT: logging.error("Example: %s failed because of timeout", example.filepath) elif example.status == STATUS_COMPILE_ERROR: - err = await client.get_compile_output(example.filepath) + err = await client.get_compile_output(example.pipeline_id) logging.error( "Example: %s has compilation error: %s", example.filepath, err) elif example.status == STATUS_RUN_ERROR: - err = await client.get_run_error(example.filepath) - logging.error( - "Example: %s has execution error: %s", example.filepath, err) + err = await client.get_run_error(example.pipeline_id) + logging.error("Example: %s has execution error: %s", example.filepath, err) verify_failed = True + + logging.info("Count of verified Playground examples: %s / %s", count_of_verified, len(examples)) + logging.info("Count of Playground examples with some error: %s / %s", len(examples) - count_of_verified, len(examples)) if verify_failed: raise Exception("CI step failed due to errors in the examples") diff --git a/playground/infrastructure/helper.py b/playground/infrastructure/helper.py index 551ef8d6b74c..6a6f0a7ae72d 100644 --- a/playground/infrastructure/helper.py +++ b/playground/infrastructure/helper.py @@ -22,8 +22,9 @@ import os from collections import namedtuple from dataclasses import dataclass, fields -from typing import List, Optional, Dict, Union +from typing import List, Optional, Dict +from tqdm.asyncio import tqdm import yaml from yaml import YAMLError @@ -128,7 +129,7 @@ async def get_statuses(examples: List[Example]): client = GRPCClient() for example in examples: tasks.append(_update_example_status(example, client)) - await asyncio.gather(*tasks) + await tqdm.gather(*tasks) def get_tag(filepath) -> Optional[ExampleTag]: @@ -278,14 +279,14 @@ def _validate(tag: dict, supported_categories: List[str]) -> bool: field.default, tag.__str__()) valid = False - - value = tag.get(field.default) - if value == "" or value is None: - logging.error( - "tag's value is incorrect: %s\nvalue for %s field can not be empty.", - tag.__str__(), - field.default.__str__()) - valid = False + if valid is True: + value = tag.get(field.default) + if (value == "" or value is None) and field.default != TagFields.pipeline_options: + logging.error( + "tag's value is incorrect: %s\nvalue for %s field can not be empty.", + tag.__str__(), + field.default.__str__()) + valid = False if valid is False: return valid @@ -353,7 +354,7 @@ async def _update_example_status(example: Example, client: GRPCClient): client: client to send requests to the server. """ pipeline_id = await client.run_code( - example.code, example.sdk, example.tag[TagFields.pipeline_options]) + example.code, example.sdk, example.tag.pipeline_options) example.pipeline_id = pipeline_id status = await client.check_status(pipeline_id) while status in [STATUS_VALIDATING, From 626195e8ced0699b5035599a2d234b98b793950a Mon Sep 17 00:00:00 2001 From: AydarZaynutdinov Date: Wed, 12 Jan 2022 10:49:50 +0300 Subject: [PATCH 2/6] [BEAM-13544][Playground] update logs' messages --- playground/infrastructure/cd_helper.py | 2 +- playground/infrastructure/ci_cd.py | 2 +- playground/infrastructure/ci_helper.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/playground/infrastructure/cd_helper.py b/playground/infrastructure/cd_helper.py index af0c5acfde16..aec63721593b 100644 --- a/playground/infrastructure/cd_helper.py +++ b/playground/infrastructure/cd_helper.py @@ -48,7 +48,7 @@ def store_examples(self, examples: List[Example]): """ logger.info("Start of executing Playground examples ...") asyncio.run(self._get_outputs(examples)) - logger.info("Start of executing Playground examples ...") + logger.info("Finish of executing Playground examples") logger.info("Start of sending Playground examples to the bucket ...") self._save_to_cloud_storage(examples) diff --git a/playground/infrastructure/ci_cd.py b/playground/infrastructure/ci_cd.py index 4aaf5ef2359c..1d94e0c2b565 100644 --- a/playground/infrastructure/ci_cd.py +++ b/playground/infrastructure/ci_cd.py @@ -81,7 +81,7 @@ def _run_ci_cd(step: config.Config.CI_CD_LITERAL, sdk: Sdk): logging.info("Start of searching Playground examples ...") examples = find_examples(root_dir, supported_categories, sdk) logging.info("Finish of searching Playground examples") - logging.info("Count of found Playground examples: %s", len(examples)) + logging.info("Number of found Playground examples: %s", len(examples)) if step == config.Config.CI_STEP_NAME: logging.info("Start of verification Playground examples ...") diff --git a/playground/infrastructure/ci_helper.py b/playground/infrastructure/ci_helper.py index 26cb29df5db7..ad9bcf036bdd 100644 --- a/playground/infrastructure/ci_helper.py +++ b/playground/infrastructure/ci_helper.py @@ -87,7 +87,7 @@ async def _verify_examples_status(self, examples: List[Example]): logging.error("Example: %s has execution error: %s", example.filepath, err) verify_failed = True - logging.info("Count of verified Playground examples: %s / %s", count_of_verified, len(examples)) - logging.info("Count of Playground examples with some error: %s / %s", len(examples) - count_of_verified, len(examples)) + logging.info("Number of verified Playground examples: %s / %s", count_of_verified, len(examples)) + logging.info("Number of Playground examples with some error: %s / %s", len(examples) - count_of_verified, len(examples)) if verify_failed: raise Exception("CI step failed due to errors in the examples") From 06e34b73966da90128ce742a71f7088283a3cb80 Mon Sep 17 00:00:00 2001 From: AydarZaynutdinov Date: Wed, 12 Jan 2022 15:26:48 +0300 Subject: [PATCH 3/6] [BEAM-13544][Playground] Update using of `tqdm` Add `tqdm` to requirements.txt --- playground/infrastructure/cd_helper.py | 7 ++----- playground/infrastructure/requirements.txt | 3 ++- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/playground/infrastructure/cd_helper.py b/playground/infrastructure/cd_helper.py index aec63721593b..d5b1df134201 100644 --- a/playground/infrastructure/cd_helper.py +++ b/playground/infrastructure/cd_helper.py @@ -26,7 +26,7 @@ from pathlib import Path from typing import List -import tqdm +from tqdm import tqdm from google.cloud import storage import logger @@ -89,14 +89,11 @@ def _save_to_cloud_storage(self, examples: List[Example]): """ self._storage_client = storage.Client() self._bucket = self._storage_client.bucket(Config.BUCKET_NAME) - pbar = tqdm(total=len(examples)) - for example in examples: + for example in tqdm(examples): file_names = self._write_to_local_fs(example) for cloud_file_name, local_file_name in file_names.items(): self._upload_blob( source_file=local_file_name, destination_blob_name=cloud_file_name) - pbar.update(1) - pbar.close() def _write_to_local_fs(self, example: Example): """ diff --git a/playground/infrastructure/requirements.txt b/playground/infrastructure/requirements.txt index c9aa1cdd81ca..fb09e8beecfa 100644 --- a/playground/infrastructure/requirements.txt +++ b/playground/infrastructure/requirements.txt @@ -22,4 +22,5 @@ protobuf==3.19.1 pytest==6.2.5 pytest-mock==3.6.1 PyYAML==6.0 -google-cloud-storage==1.43.0 \ No newline at end of file +google-cloud-storage==1.43.0 +tqdm~=4.62.3 \ No newline at end of file From 805308be557202af55076d65cbd0c4f09e209cee Mon Sep 17 00:00:00 2001 From: AydarZaynutdinov Date: Thu, 13 Jan 2022 15:28:59 +0300 Subject: [PATCH 4/6] [BEAM-13544][Playground] Add logic to replace `\t` with spaces for tag --- playground/infrastructure/helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/playground/infrastructure/helper.py b/playground/infrastructure/helper.py index 6a6f0a7ae72d..ad0cfa70fc24 100644 --- a/playground/infrastructure/helper.py +++ b/playground/infrastructure/helper.py @@ -151,7 +151,7 @@ def get_tag(filepath) -> Optional[ExampleTag]: lines = parsed_file.readlines() for line in lines: - formatted_line = line.replace("//", "").replace("#", "") + formatted_line = line.replace("//", "").replace("#", "").replace("\t", " ") if add_to_yaml is False: if formatted_line.lstrip() == Config.BEAM_PLAYGROUND_TITLE: add_to_yaml = True From a7a11734a391719c97ecb254ee79af15a6374ab1 Mon Sep 17 00:00:00 2001 From: AydarZaynutdinov Date: Thu, 13 Jan 2022 18:42:38 +0300 Subject: [PATCH 5/6] [BEAM-13544][Playground] Change using `logger` to `logging` --- playground/infrastructure/cd_helper.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/playground/infrastructure/cd_helper.py b/playground/infrastructure/cd_helper.py index d5b1df134201..78032696a191 100644 --- a/playground/infrastructure/cd_helper.py +++ b/playground/infrastructure/cd_helper.py @@ -29,7 +29,6 @@ from tqdm import tqdm from google.cloud import storage -import logger from api.v1.api_pb2 import Sdk from config import Config, PrecompiledExample from grpc_client import GRPCClient @@ -46,13 +45,13 @@ def store_examples(self, examples: List[Example]): """ Store beam examples and their output in the Google Cloud. """ - logger.info("Start of executing Playground examples ...") + logging.info("Start of executing Playground examples ...") asyncio.run(self._get_outputs(examples)) - logger.info("Finish of executing Playground examples") + logging.info("Finish of executing Playground examples") - logger.info("Start of sending Playground examples to the bucket ...") + logging.info("Start of sending Playground examples to the bucket ...") self._save_to_cloud_storage(examples) - logger.info("Finish of sending Playground examples to the bucket") + logging.info("Finish of sending Playground examples to the bucket") self._clear_temp_folder() async def _get_outputs(self, examples: List[Example]): From e817e76335aa53c1f9b40f405c2f5df06f5c8bc1 Mon Sep 17 00:00:00 2001 From: AydarZaynutdinov Date: Mon, 17 Jan 2022 17:24:53 +0300 Subject: [PATCH 6/6] [BEAM-13544][Playground] Fix according to `linter` and `yapf` --- playground/infrastructure/cd_helper.py | 11 +++--- playground/infrastructure/ci_helper.py | 14 +++++-- playground/infrastructure/grpc_client.py | 1 + playground/infrastructure/helper.py | 11 +++--- playground/infrastructure/test_cd_helper.py | 6 ++- playground/infrastructure/test_grpc_client.py | 1 + playground/infrastructure/test_helper.py | 38 ++++++++----------- 7 files changed, 45 insertions(+), 37 deletions(-) diff --git a/playground/infrastructure/cd_helper.py b/playground/infrastructure/cd_helper.py index 78032696a191..06e5b473a376 100644 --- a/playground/infrastructure/cd_helper.py +++ b/playground/infrastructure/cd_helper.py @@ -41,6 +41,7 @@ class CDHelper: It is used to save beam examples/katas/tests and their output on the GCS. """ + def store_examples(self, examples: List[Example]): """ Store beam examples and their output in the Google Cloud. @@ -77,7 +78,7 @@ async def _get_outputs(self, examples: List[Example]): example.output = output for log, example in zip(logs, examples): - example.logs = log + example.logs = log def _save_to_cloud_storage(self, examples: List[Example]): """ @@ -123,10 +124,10 @@ def _write_to_local_fs(self, example: Example): file_name=example.tag.name, extension=PrecompiledExample.OUTPUT_EXTENSION) log_path = self._get_gcs_object_name( - sdk=example.sdk, - base_folder_name=example.tag.name, - file_name=example.tag.name, - extension=PrecompiledExample.LOG_EXTENSION) + sdk=example.sdk, + base_folder_name=example.tag.name, + file_name=example.tag.name, + extension=PrecompiledExample.LOG_EXTENSION) meta_path = self._get_gcs_object_name( sdk=example.sdk, base_folder_name=example.tag.name, diff --git a/playground/infrastructure/ci_helper.py b/playground/infrastructure/ci_helper.py index ad9bcf036bdd..08677c2bd286 100644 --- a/playground/infrastructure/ci_helper.py +++ b/playground/infrastructure/ci_helper.py @@ -36,6 +36,7 @@ class CIHelper: It is used to find and verify correctness if beam examples/katas/tests. """ + async def verify_examples(self, examples: List[Example]): """ Verify correctness of beam examples. @@ -84,10 +85,17 @@ async def _verify_examples_status(self, examples: List[Example]): "Example: %s has compilation error: %s", example.filepath, err) elif example.status == STATUS_RUN_ERROR: err = await client.get_run_error(example.pipeline_id) - logging.error("Example: %s has execution error: %s", example.filepath, err) + logging.error( + "Example: %s has execution error: %s", example.filepath, err) verify_failed = True - logging.info("Number of verified Playground examples: %s / %s", count_of_verified, len(examples)) - logging.info("Number of Playground examples with some error: %s / %s", len(examples) - count_of_verified, len(examples)) + logging.info( + "Number of verified Playground examples: %s / %s", + count_of_verified, + len(examples)) + logging.info( + "Number of Playground examples with some error: %s / %s", + len(examples) - count_of_verified, + len(examples)) if verify_failed: raise Exception("CI step failed due to errors in the examples") diff --git a/playground/infrastructure/grpc_client.py b/playground/infrastructure/grpc_client.py index bc6026c0c95c..4f9496206c7a 100644 --- a/playground/infrastructure/grpc_client.py +++ b/playground/infrastructure/grpc_client.py @@ -27,6 +27,7 @@ class GRPCClient: """GRPCClient is gRPC client for sending a request to the backend.""" + def __init__(self): self._channel = grpc.aio.insecure_channel(Config.SERVER_ADDRESS) self._stub = api_pb2_grpc.PlaygroundServiceStub(self._channel) diff --git a/playground/infrastructure/helper.py b/playground/infrastructure/helper.py index ad0cfa70fc24..12b0699774bf 100644 --- a/playground/infrastructure/helper.py +++ b/playground/infrastructure/helper.py @@ -151,7 +151,8 @@ def get_tag(filepath) -> Optional[ExampleTag]: lines = parsed_file.readlines() for line in lines: - formatted_line = line.replace("//", "").replace("#", "").replace("\t", " ") + formatted_line = line.replace("//", "").replace("#", + "").replace("\t", " ") if add_to_yaml is False: if formatted_line.lstrip() == Config.BEAM_PLAYGROUND_TITLE: add_to_yaml = True @@ -222,8 +223,7 @@ def get_supported_categories(categories_path: str) -> List[str]: return yaml_object[TagFields.categories] -def _get_example( - filepath: str, filename: str, tag: ExampleTag) -> Example: +def _get_example(filepath: str, filename: str, tag: ExampleTag) -> Example: """ Return an Example by filepath and filename. @@ -281,9 +281,10 @@ def _validate(tag: dict, supported_categories: List[str]) -> bool: valid = False if valid is True: value = tag.get(field.default) - if (value == "" or value is None) and field.default != TagFields.pipeline_options: + if (value == "" or + value is None) and field.default != TagFields.pipeline_options: logging.error( - "tag's value is incorrect: %s\nvalue for %s field can not be empty.", + "tag's value is incorrect: %s\n%s field can not be empty.", tag.__str__(), field.default.__str__()) valid = False diff --git a/playground/infrastructure/test_cd_helper.py b/playground/infrastructure/test_cd_helper.py index 91f8d720951e..39e51b815b2c 100644 --- a/playground/infrastructure/test_cd_helper.py +++ b/playground/infrastructure/test_cd_helper.py @@ -58,9 +58,11 @@ def test__get_gcs_object_name(): def test__write_to_local_fs(delete_temp_folder): """ - Test writing code of an example, output and meta info to the filesystem (in temp folder) + Test writing code of an example, output and meta info to + the filesystem (in temp folder) Args: - delete_temp_folder: python fixture to clean up temp folder after method execution + delete_temp_folder: python fixture to clean up temp folder + after method execution """ object_meta = { "name": "name", diff --git a/playground/infrastructure/test_grpc_client.py b/playground/infrastructure/test_grpc_client.py index c7a795165cbd..113cbb23251c 100644 --- a/playground/infrastructure/test_grpc_client.py +++ b/playground/infrastructure/test_grpc_client.py @@ -59,6 +59,7 @@ def mock_get_compile_output(mocker): class TestGRPCClient: + @pytest.mark.asyncio async def test_run_code(self, mock_run_code): result = await GRPCClient().run_code("", api_pb2.SDK_GO, "") diff --git a/playground/infrastructure/test_helper.py b/playground/infrastructure/test_helper.py index f2c69d392be3..29abca1bf1b1 100644 --- a/playground/infrastructure/test_helper.py +++ b/playground/infrastructure/test_helper.py @@ -20,7 +20,7 @@ from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, \ STATUS_VALIDATING, \ - STATUS_FINISHED, SDK_JAVA, SDK_PYTHON, SDK_GO, \ + STATUS_FINISHED, SDK_JAVA, \ PRECOMPILED_OBJECT_TYPE_EXAMPLE, PRECOMPILED_OBJECT_TYPE_KATA, \ PRECOMPILED_OBJECT_TYPE_UNIT_TEST from grpc_client import GRPCClient @@ -38,7 +38,7 @@ def test_find_examples_with_valid_tag(mock_os_walk, mock_check_file): sdk = SDK_UNSPECIFIED result = find_examples(work_dir="", supported_categories=[], sdk=sdk) - assert result == [] + assert not result mock_os_walk.assert_called_once_with("") mock_check_file.assert_called_once_with( examples=[], @@ -56,9 +56,8 @@ def test_find_examples_with_invalid_tag(mock_os_walk, mock_check_file): sdk = SDK_UNSPECIFIED with pytest.raises( ValueError, - match= - "Some of the beam examples contain beam playground tag with an incorrect format" - ): + match="Some of the beam examples contain beam playground tag with " + "an incorrect format"): find_examples("", [], sdk=sdk) mock_os_walk.assert_called_once_with("") @@ -171,22 +170,16 @@ def test_get_supported_categories(): @mock.patch("helper._get_name") def test__get_example(mock_get_name): mock_get_name.return_value = "filepath" - tag = ExampleTag( - { - "name": "Name", - "description": "Description", - "multifile": "False", - "categories": [""], - "pipeline_options": "--option option" - }, - "" - ) - - result = _get_example( - "/root/filepath.java", - "filepath.java", - tag - ) + tag = ExampleTag({ + "name": "Name", + "description": "Description", + "multifile": "False", + "categories": [""], + "pipeline_options": "--option option" + }, + "") + + result = _get_example("/root/filepath.java", "filepath.java", tag) assert result == Example( name="filepath", @@ -284,7 +277,8 @@ async def test__update_example_status( assert example.pipeline_id == "pipeline_id" assert example.status == STATUS_FINISHED - mock_grpc_client_run_code.assert_called_once_with(example.code, example.sdk, "--key value") + mock_grpc_client_run_code.assert_called_once_with( + example.code, example.sdk, "--key value") mock_grpc_client_check_status.assert_has_calls([mock.call("pipeline_id")])