Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions playground/infrastructure/cd_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from pathlib import Path
from typing import List

from tqdm import tqdm
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add this tqdm as a dependency somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this dependency to the requirements list requirements.txt.

from google.cloud import storage

from api.v1.api_pb2 import Sdk
Expand All @@ -40,12 +41,18 @@ class CDHelper:

It is used to save beam examples/katas/tests and their output on the GCS.
"""

def store_examples(self, examples: List[Example]):
"""
Store beam examples and their output in the Google Cloud.
"""
logging.info("Start of executing Playground examples ...")
asyncio.run(self._get_outputs(examples))
logging.info("Finish of executing Playground examples")

logging.info("Start of sending Playground examples to the bucket ...")
self._save_to_cloud_storage(examples)
logging.info("Finish of sending Playground examples to the bucket")
self._clear_temp_folder()

async def _get_outputs(self, examples: List[Example]):
Expand All @@ -71,7 +78,7 @@ async def _get_outputs(self, examples: List[Example]):
example.output = output

for log, example in zip(logs, examples):
example.logs = log
example.logs = log

def _save_to_cloud_storage(self, examples: List[Example]):
"""
Expand All @@ -82,7 +89,7 @@ def _save_to_cloud_storage(self, examples: List[Example]):
"""
self._storage_client = storage.Client()
self._bucket = self._storage_client.bucket(Config.BUCKET_NAME)
for example in examples:
for example in tqdm(examples):
file_names = self._write_to_local_fs(example)
for cloud_file_name, local_file_name in file_names.items():
self._upload_blob(
Expand Down Expand Up @@ -117,10 +124,10 @@ def _write_to_local_fs(self, example: Example):
file_name=example.tag.name,
extension=PrecompiledExample.OUTPUT_EXTENSION)
log_path = self._get_gcs_object_name(
sdk=example.sdk,
base_folder_name=example.tag.name,
file_name=example.tag.name,
extension=PrecompiledExample.LOG_EXTENSION)
sdk=example.sdk,
base_folder_name=example.tag.name,
file_name=example.tag.name,
extension=PrecompiledExample.LOG_EXTENSION)
meta_path = self._get_gcs_object_name(
sdk=example.sdk,
base_folder_name=example.tag.name,
Expand Down
9 changes: 9 additions & 0 deletions playground/infrastructure/ci_cd.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""
import argparse
import asyncio
import logging
import os
from typing import List

Expand Down Expand Up @@ -77,11 +78,19 @@ def _check_envs():

def _run_ci_cd(step: config.Config.CI_CD_LITERAL, sdk: Sdk):
supported_categories = get_supported_categories(categories_file)
logging.info("Start of searching Playground examples ...")
examples = find_examples(root_dir, supported_categories, sdk)
logging.info("Finish of searching Playground examples")
logging.info("Number of found Playground examples: %s", len(examples))

if step == config.Config.CI_STEP_NAME:
logging.info("Start of verification Playground examples ...")
_ci_step(examples=examples)
logging.info("Finish of verification Playground examples")
if step == config.Config.CD_STEP_NAME:
logging.info("Start of storing Playground examples ...")
_cd_step(examples=examples)
logging.info("Finish of storing Playground examples")


if __name__ == "__main__":
Expand Down
16 changes: 14 additions & 2 deletions playground/infrastructure/ci_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class CIHelper:

It is used to find and verify correctness if beam examples/katas/tests.
"""

async def verify_examples(self, examples: List[Example]):
"""
Verify correctness of beam examples.
Expand All @@ -62,10 +63,12 @@ async def _verify_examples_status(self, examples: List[Example]):
Args:
examples: beam examples that should be verified
"""
count_of_verified = 0
client = GRPCClient()
verify_failed = False
for example in examples:
if example.status not in Config.ERROR_STATUSES:
count_of_verified += 1
continue
if example.status == STATUS_VALIDATION_ERROR:
logging.error("Example: %s has validation error", example.filepath)
Expand All @@ -77,13 +80,22 @@ async def _verify_examples_status(self, examples: List[Example]):
elif example.status == STATUS_RUN_TIMEOUT:
logging.error("Example: %s failed because of timeout", example.filepath)
elif example.status == STATUS_COMPILE_ERROR:
err = await client.get_compile_output(example.filepath)
err = await client.get_compile_output(example.pipeline_id)
logging.error(
"Example: %s has compilation error: %s", example.filepath, err)
elif example.status == STATUS_RUN_ERROR:
err = await client.get_run_error(example.filepath)
err = await client.get_run_error(example.pipeline_id)
logging.error(
"Example: %s has execution error: %s", example.filepath, err)
verify_failed = True

logging.info(
"Number of verified Playground examples: %s / %s",
count_of_verified,
len(examples))
logging.info(
"Number of Playground examples with some error: %s / %s",
len(examples) - count_of_verified,
len(examples))
if verify_failed:
raise Exception("CI step failed due to errors in the examples")
1 change: 1 addition & 0 deletions playground/infrastructure/grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

class GRPCClient:
"""GRPCClient is gRPC client for sending a request to the backend."""

def __init__(self):
self._channel = grpc.aio.insecure_channel(Config.SERVER_ADDRESS)
self._stub = api_pb2_grpc.PlaygroundServiceStub(self._channel)
Expand Down
30 changes: 16 additions & 14 deletions playground/infrastructure/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import os
from collections import namedtuple
from dataclasses import dataclass, fields
from typing import List, Optional, Dict, Union
from typing import List, Optional, Dict

from tqdm.asyncio import tqdm
import yaml
from yaml import YAMLError

Expand Down Expand Up @@ -128,7 +129,7 @@ async def get_statuses(examples: List[Example]):
client = GRPCClient()
for example in examples:
tasks.append(_update_example_status(example, client))
await asyncio.gather(*tasks)
await tqdm.gather(*tasks)


def get_tag(filepath) -> Optional[ExampleTag]:
Expand All @@ -150,7 +151,8 @@ def get_tag(filepath) -> Optional[ExampleTag]:
lines = parsed_file.readlines()

for line in lines:
formatted_line = line.replace("//", "").replace("#", "")
formatted_line = line.replace("//", "").replace("#",
"").replace("\t", " ")
if add_to_yaml is False:
if formatted_line.lstrip() == Config.BEAM_PLAYGROUND_TITLE:
add_to_yaml = True
Expand Down Expand Up @@ -221,8 +223,7 @@ def get_supported_categories(categories_path: str) -> List[str]:
return yaml_object[TagFields.categories]


def _get_example(
filepath: str, filename: str, tag: ExampleTag) -> Example:
def _get_example(filepath: str, filename: str, tag: ExampleTag) -> Example:
"""
Return an Example by filepath and filename.

Expand Down Expand Up @@ -278,14 +279,15 @@ def _validate(tag: dict, supported_categories: List[str]) -> bool:
field.default,
tag.__str__())
valid = False

value = tag.get(field.default)
if value == "" or value is None:
logging.error(
"tag's value is incorrect: %s\nvalue for %s field can not be empty.",
tag.__str__(),
field.default.__str__())
valid = False
if valid is True:
value = tag.get(field.default)
if (value == "" or
value is None) and field.default != TagFields.pipeline_options:
logging.error(
"tag's value is incorrect: %s\n%s field can not be empty.",
tag.__str__(),
field.default.__str__())
valid = False

if valid is False:
return valid
Expand Down Expand Up @@ -353,7 +355,7 @@ async def _update_example_status(example: Example, client: GRPCClient):
client: client to send requests to the server.
"""
pipeline_id = await client.run_code(
example.code, example.sdk, example.tag[TagFields.pipeline_options])
example.code, example.sdk, example.tag.pipeline_options)
example.pipeline_id = pipeline_id
status = await client.check_status(pipeline_id)
while status in [STATUS_VALIDATING,
Expand Down
3 changes: 2 additions & 1 deletion playground/infrastructure/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ protobuf==3.19.1
pytest==6.2.5
pytest-mock==3.6.1
PyYAML==6.0
google-cloud-storage==1.43.0
google-cloud-storage==1.43.0
tqdm~=4.62.3
6 changes: 4 additions & 2 deletions playground/infrastructure/test_cd_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ def test__get_gcs_object_name():

def test__write_to_local_fs(delete_temp_folder):
"""
Test writing code of an example, output and meta info to the filesystem (in temp folder)
Test writing code of an example, output and meta info to
the filesystem (in temp folder)
Args:
delete_temp_folder: python fixture to clean up temp folder after method execution
delete_temp_folder: python fixture to clean up temp folder
after method execution
"""
object_meta = {
"name": "name",
Expand Down
1 change: 1 addition & 0 deletions playground/infrastructure/test_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def mock_get_compile_output(mocker):


class TestGRPCClient:

@pytest.mark.asyncio
async def test_run_code(self, mock_run_code):
result = await GRPCClient().run_code("", api_pb2.SDK_GO, "")
Expand Down
38 changes: 16 additions & 22 deletions playground/infrastructure/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, \
STATUS_VALIDATING, \
STATUS_FINISHED, SDK_JAVA, SDK_PYTHON, SDK_GO, \
STATUS_FINISHED, SDK_JAVA, \
PRECOMPILED_OBJECT_TYPE_EXAMPLE, PRECOMPILED_OBJECT_TYPE_KATA, \
PRECOMPILED_OBJECT_TYPE_UNIT_TEST
from grpc_client import GRPCClient
Expand All @@ -38,7 +38,7 @@ def test_find_examples_with_valid_tag(mock_os_walk, mock_check_file):
sdk = SDK_UNSPECIFIED
result = find_examples(work_dir="", supported_categories=[], sdk=sdk)

assert result == []
assert not result
mock_os_walk.assert_called_once_with("")
mock_check_file.assert_called_once_with(
examples=[],
Expand All @@ -56,9 +56,8 @@ def test_find_examples_with_invalid_tag(mock_os_walk, mock_check_file):
sdk = SDK_UNSPECIFIED
with pytest.raises(
ValueError,
match=
"Some of the beam examples contain beam playground tag with an incorrect format"
):
match="Some of the beam examples contain beam playground tag with "
"an incorrect format"):
find_examples("", [], sdk=sdk)

mock_os_walk.assert_called_once_with("")
Expand Down Expand Up @@ -171,22 +170,16 @@ def test_get_supported_categories():
@mock.patch("helper._get_name")
def test__get_example(mock_get_name):
mock_get_name.return_value = "filepath"
tag = ExampleTag(
{
"name": "Name",
"description": "Description",
"multifile": "False",
"categories": [""],
"pipeline_options": "--option option"
},
""
)

result = _get_example(
"/root/filepath.java",
"filepath.java",
tag
)
tag = ExampleTag({
"name": "Name",
"description": "Description",
"multifile": "False",
"categories": [""],
"pipeline_options": "--option option"
},
"")

result = _get_example("/root/filepath.java", "filepath.java", tag)

assert result == Example(
name="filepath",
Expand Down Expand Up @@ -284,7 +277,8 @@ async def test__update_example_status(

assert example.pipeline_id == "pipeline_id"
assert example.status == STATUS_FINISHED
mock_grpc_client_run_code.assert_called_once_with(example.code, example.sdk, "--key value")
mock_grpc_client_run_code.assert_called_once_with(
example.code, example.sdk, "--key value")
mock_grpc_client_check_status.assert_has_calls([mock.call("pipeline_id")])


Expand Down