diff --git a/playground/infrastructure/ci_cd.py b/playground/infrastructure/ci_cd.py index 9744dc18c9d8..a6947231e9a8 100644 --- a/playground/infrastructure/ci_cd.py +++ b/playground/infrastructure/ci_cd.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import asyncio import os diff --git a/playground/infrastructure/ci_helper.py b/playground/infrastructure/ci_helper.py index 773040ae35ee..2744c4d27ba7 100644 --- a/playground/infrastructure/ci_helper.py +++ b/playground/infrastructure/ci_helper.py @@ -38,7 +38,7 @@ async def verify_examples(self, examples: List[Example]): 2. Group code of examples by their SDK. 3. Run processing for all examples to verify examples' code. """ - get_statuses(examples) + await get_statuses(examples) await self._verify_examples_status(examples) async def _verify_examples_status(self, examples: List[Example]): diff --git a/playground/infrastructure/config.py b/playground/infrastructure/config.py index f520aec6ce2b..1c006be5d99e 100644 --- a/playground/infrastructure/config.py +++ b/playground/infrastructure/config.py @@ -28,6 +28,7 @@ class Config: SUPPORTED_SDK = {'java': SDK_JAVA, 'go': SDK_GO, 'py': SDK_PYTHON} BEAM_PLAYGROUND_TITLE = "Beam-playground:\n" BEAM_PLAYGROUND = "Beam-playground" + PAUSE_DELAY = 10 @dataclass(frozen=True) diff --git a/playground/infrastructure/helper.py b/playground/infrastructure/helper.py index 0a24aaf08396..489bb615200e 100644 --- a/playground/infrastructure/helper.py +++ b/playground/infrastructure/helper.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import logging import os import yaml @@ -21,8 +22,10 @@ from typing import List from yaml import YAMLError from config import Config, TagFields -from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, Sdk +from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, Sdk, STATUS_VALIDATING, STATUS_PREPARING, \ + STATUS_COMPILING, STATUS_EXECUTING from collections import namedtuple +from grpc_client import GRPCClient Tag = namedtuple("Tag", [TagFields.NAME, TagFields.DESCRIPTION, TagFields.MULTIFILE, TagFields.CATEGORIES]) @@ -75,21 +78,18 @@ def find_examples(work_dir: str, supported_categories: List[str]) -> List[Exampl return examples -def get_statuses(examples: List[Example]): +async def get_statuses(examples: List[Example]): """ - Receive statuses for examples and update example.status and example.pipeline_id - - Use client to send requests to the backend: - 1. Start code processing. - 2. Ping the backend while status is STATUS_VALIDATING/STATUS_PREPARING/STATUS_COMPILING/STATUS_EXECUTING - Update example.pipeline_id with resulting pipelineId. - Update example.status with resulting status. + Receive status and update example.status and example.pipeline_id for each example Args: - examples: beam examples for processing and updating statuses. + examples: beam examples for processing and updating statuses and pipeline_id values. """ - # TODO [BEAM-13267] Implement - pass + tasks = [] + client = GRPCClient() + for example in examples: + tasks.append(_update_example_status(example, client)) + await asyncio.gather(*tasks) def get_tag(filepath): @@ -272,3 +272,25 @@ def _get_sdk(filename: str) -> Sdk: return Config.SUPPORTED_SDK[extension] else: raise ValueError(extension + " is not supported") + + +async def _update_example_status(example: Example, client: GRPCClient): + """ + Receive status for examples and update example.status and pipeline_id + + Use client to send requests to the backend: + 1. Start code processing. + 2. Ping the backend while status is STATUS_VALIDATING/STATUS_PREPARING/STATUS_COMPILING/STATUS_EXECUTING + Update example.status with resulting status. + + Args: + example: beam example for processing and updating status and pipeline_id. + client: client to send requests to the server. + """ + pipeline_id = await client.run_code(example.code, example.sdk) + example.pipeline_id = pipeline_id + status = await client.check_status(pipeline_id) + while status in [STATUS_VALIDATING, STATUS_PREPARING, STATUS_COMPILING, STATUS_EXECUTING]: + await asyncio.sleep(Config.PAUSE_DELAY) + status = await client.check_status(pipeline_id) + example.status = status diff --git a/playground/infrastructure/test_helper.py b/playground/infrastructure/test_helper.py index f3145806cc50..969a7875ee01 100644 --- a/playground/infrastructure/test_helper.py +++ b/playground/infrastructure/test_helper.py @@ -17,9 +17,11 @@ import pytest from unittest.mock import mock_open -from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, SDK_JAVA, SDK_PYTHON, SDK_GO -from helper import find_examples, Example, _get_example, _get_name, _get_sdk, get_tag, _validate, Tag, \ - get_supported_categories, _check_file +from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, SDK_JAVA, SDK_PYTHON, SDK_GO, STATUS_VALIDATING, \ + STATUS_FINISHED +from grpc_client import GRPCClient +from helper import find_examples, Example, _get_example, _get_name, _get_sdk, get_tag, _validate, Tag, get_statuses, \ + _update_example_status, get_supported_categories, _check_file @mock.patch('helper._check_file') @@ -49,6 +51,21 @@ def test_find_examples_with_invalid_tag(mock_os_walk, mock_check_file): mock_check_file.assert_called_once_with([], "file.java", "/root/file.java", []) +@pytest.mark.asyncio +@mock.patch('helper.GRPCClient') +@mock.patch('helper._update_example_status') +async def test_get_statuses(mock_update_example_status, mock_grpc_client): + example = Example("file", "pipeline_id", SDK_UNSPECIFIED, "root/file.extension", "code", "output", + STATUS_UNSPECIFIED, {"name": "Name"}) + client = None + + mock_grpc_client.return_value = client + + await get_statuses([example]) + + mock_update_example_status.assert_called_once_with(example, client) + + @mock.patch('builtins.open', mock_open(read_data="...\n# Beam-playground:\n# name: Name\n\nimport ...")) def test_get_tag_when_tag_is_exists(): result = get_tag("") @@ -183,3 +200,21 @@ def test__get_sdk_with_supported_extension(): def test__get_sdk_with_unsupported_extension(): with pytest.raises(ValueError, match="extension is not supported"): _get_sdk("filename.extension") + + +@pytest.mark.asyncio +@mock.patch('grpc_client.GRPCClient.check_status') +@mock.patch('grpc_client.GRPCClient.run_code') +async def test__update_example_status(mock_grpc_client_run_code, mock_grpc_client_check_status): + example = Example("file", "pipeline_id", SDK_UNSPECIFIED, "root/file.extension", "code", "output", + STATUS_UNSPECIFIED, {"name": "Name"}) + + mock_grpc_client_run_code.return_value = "pipeline_id" + mock_grpc_client_check_status.side_effect = [STATUS_VALIDATING, STATUS_FINISHED] + + await _update_example_status(example, GRPCClient()) + + assert example.pipeline_id == "pipeline_id" + assert example.status == STATUS_FINISHED + mock_grpc_client_run_code.assert_called_once_with(example.code, example.sdk) + mock_grpc_client_check_status.assert_has_calls([mock.call("pipeline_id")])