diff --git a/playground/infrastructure/ci_cd.py b/playground/infrastructure/ci_cd.py index 23312c63878b..1400cc951345 100644 --- a/playground/infrastructure/ci_cd.py +++ b/playground/infrastructure/ci_cd.py @@ -12,28 +12,31 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import asyncio import os from cd_helper import CDHelper from ci_helper import CIHelper from helper import find_examples +from logger import setup_logger def ci_step(): """ CI step to verify all beam examples/tests/katas """ + setup_logger() root_dir = os.getenv("BEAM_ROOT_DIR") ci_helper = CIHelper() examples = find_examples(root_dir) - ci_helper.verify_examples(examples) + asyncio.run(ci_helper.verify_examples(examples)) def cd_step(): """ CD step to save all beam examples/tests/katas and their outputs on the Google Cloud """ + setup_logger() root_dir = os.getenv("BEAM_ROOT_DIR") cd_helper = CDHelper() examples = find_examples(root_dir) diff --git a/playground/infrastructure/ci_helper.py b/playground/infrastructure/ci_helper.py index 2822b732f6e8..773040ae35ee 100644 --- a/playground/infrastructure/ci_helper.py +++ b/playground/infrastructure/ci_helper.py @@ -13,7 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging from typing import List + +from api.v1.api_pb2 import STATUS_COMPILE_ERROR, STATUS_ERROR, STATUS_RUN_ERROR, STATUS_RUN_TIMEOUT, \ + STATUS_VALIDATION_ERROR, STATUS_PREPARATION_ERROR +from config import Config +from grpc_client import GRPCClient from helper import Example, get_statuses @@ -24,7 +30,7 @@ class CIHelper: It is used to find and verify correctness if beam examples/katas/tests. """ - def verify_examples(self, examples: List[Example]): + async def verify_examples(self, examples: List[Example]): """ Verify correctness of beam examples. @@ -33,9 +39,9 @@ def verify_examples(self, examples: List[Example]): 3. Run processing for all examples to verify examples' code. """ get_statuses(examples) - self._verify_examples_status(examples) + await self._verify_examples_status(examples) - def _verify_examples_status(self, examples: List[Example]): + async def _verify_examples_status(self, examples: List[Example]): """ Verify statuses of beam examples. @@ -47,5 +53,25 @@ def _verify_examples_status(self, examples: List[Example]): Args: examples: beam examples that should be verified """ - # TODO [BEAM-13256] Implement - pass + client = GRPCClient() + verify_failed = False + for example in examples: + if example.status not in Config.ERROR_STATUSES: + continue + if example.status == STATUS_VALIDATION_ERROR: + logging.error(f"Example: {example.pipeline_id} has validation error") + elif example.status == STATUS_PREPARATION_ERROR: + logging.error(f"Example: {example.pipeline_id} has preparation error") + elif example.status == STATUS_ERROR: + logging.error(f"Example: {example.pipeline_id} has error during setup run builder") + elif example.status == STATUS_RUN_TIMEOUT: + logging.error(f"Example: {example.pipeline_id} failed because of timeout") + elif example.status == STATUS_COMPILE_ERROR: + err = await client.get_compile_output(example.pipeline_id) + logging.error(f"Example: {example.pipeline_id} has compilation error: {err}") + elif example.status == STATUS_RUN_ERROR: + err = await client.get_run_error(example.pipeline_id) + logging.error(f"Example: {example.pipeline_id} has execution error: {err}") + verify_failed = True + if verify_failed: + raise Exception("CI step failed due to errors in the examples") diff --git a/playground/infrastructure/config.py b/playground/infrastructure/config.py index 33e1ed7bfbaf..79ad1ceaad80 100644 --- a/playground/infrastructure/config.py +++ b/playground/infrastructure/config.py @@ -15,6 +15,8 @@ import os from dataclasses import dataclass +from api.v1.api_pb2 import STATUS_VALIDATION_ERROR, STATUS_ERROR, STATUS_PREPARATION_ERROR, STATUS_COMPILE_ERROR, \ + STATUS_RUN_TIMEOUT, STATUS_RUN_ERROR from api.v1.api_pb2 import SDK_JAVA @@ -22,4 +24,6 @@ @dataclass(frozen=True) class Config: SERVER_ADDRESS = os.getenv("SERVER_ADDRESS", "localhost:8080") + ERROR_STATUSES = [STATUS_VALIDATION_ERROR, STATUS_ERROR, STATUS_PREPARATION_ERROR, STATUS_COMPILE_ERROR, + STATUS_RUN_TIMEOUT, STATUS_RUN_ERROR] SUPPORTED_SDK = {'java': SDK_JAVA} diff --git a/playground/infrastructure/logger.py b/playground/infrastructure/logger.py new file mode 100644 index 000000000000..f8c0568c45dd --- /dev/null +++ b/playground/infrastructure/logger.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sys +from logging import INFO, WARNING, ERROR, CRITICAL + + +def setup_logger(): + """ + Setup logging. + + Add 2 handler in root logger: + StreamHandler - for logs(INFO and WARNING levels) to the stdout + StreamHandler - for logs(ERROR and CRITICAL levels) to the stderr + """ + log = logging.getLogger() + log.setLevel(logging.INFO) + formatter = logging.Formatter('[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s') + + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.addFilter(lambda record: record.levelno in (INFO, WARNING)) + stdout_handler.setFormatter(formatter) + + stderr_handler = logging.StreamHandler(sys.stderr) + stderr_handler.addFilter(lambda record: record.levelno in (ERROR, CRITICAL)) + stderr_handler.setFormatter(formatter) + + log.addHandler(stdout_handler) + log.addHandler(stderr_handler)