Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions playground/infrastructure/ci_cd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,31 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import os

from cd_helper import CDHelper
from ci_helper import CIHelper
from helper import find_examples
from logger import setup_logger


def ci_step():
"""
CI step to verify all beam examples/tests/katas
"""
setup_logger()
root_dir = os.getenv("BEAM_ROOT_DIR")
ci_helper = CIHelper()
examples = find_examples(root_dir)
ci_helper.verify_examples(examples)
asyncio.run(ci_helper.verify_examples(examples))


def cd_step():
"""
CD step to save all beam examples/tests/katas and their outputs on the Google Cloud
"""
setup_logger()
root_dir = os.getenv("BEAM_ROOT_DIR")
cd_helper = CDHelper()
examples = find_examples(root_dir)
Expand Down
36 changes: 31 additions & 5 deletions playground/infrastructure/ci_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import List

from api.v1.api_pb2 import STATUS_COMPILE_ERROR, STATUS_ERROR, STATUS_RUN_ERROR, STATUS_RUN_TIMEOUT, \
STATUS_VALIDATION_ERROR, STATUS_PREPARATION_ERROR
from config import Config
from grpc_client import GRPCClient
from helper import Example, get_statuses


Expand All @@ -24,7 +30,7 @@ class CIHelper:
It is used to find and verify correctness if beam examples/katas/tests.
"""

def verify_examples(self, examples: List[Example]):
async def verify_examples(self, examples: List[Example]):
"""
Verify correctness of beam examples.

Expand All @@ -33,9 +39,9 @@ def verify_examples(self, examples: List[Example]):
3. Run processing for all examples to verify examples' code.
"""
get_statuses(examples)
self._verify_examples_status(examples)
await self._verify_examples_status(examples)

def _verify_examples_status(self, examples: List[Example]):
async def _verify_examples_status(self, examples: List[Example]):
"""
Verify statuses of beam examples.

Expand All @@ -47,5 +53,25 @@ def _verify_examples_status(self, examples: List[Example]):
Args:
examples: beam examples that should be verified
"""
# TODO [BEAM-13256] Implement
pass
client = GRPCClient()
verify_failed = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I suppose somewhere around here we'll make calls to the service to actually verify all the examples, right? But I guess that will be a follow up change?

for example in examples:
if example.status not in Config.ERROR_STATUSES:
continue
if example.status == STATUS_VALIDATION_ERROR:
logging.error(f"Example: {example.pipeline_id} has validation error")
elif example.status == STATUS_PREPARATION_ERROR:
logging.error(f"Example: {example.pipeline_id} has preparation error")
elif example.status == STATUS_ERROR:
logging.error(f"Example: {example.pipeline_id} has error during setup run builder")
elif example.status == STATUS_RUN_TIMEOUT:
logging.error(f"Example: {example.pipeline_id} failed because of timeout")
elif example.status == STATUS_COMPILE_ERROR:
err = await client.get_compile_output(example.pipeline_id)
logging.error(f"Example: {example.pipeline_id} has compilation error: {err}")
elif example.status == STATUS_RUN_ERROR:
err = await client.get_run_error(example.pipeline_id)
logging.error(f"Example: {example.pipeline_id} has execution error: {err}")
verify_failed = True
if verify_failed:
raise Exception("CI step failed due to errors in the examples")
4 changes: 4 additions & 0 deletions playground/infrastructure/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@

import os
from dataclasses import dataclass
from api.v1.api_pb2 import STATUS_VALIDATION_ERROR, STATUS_ERROR, STATUS_PREPARATION_ERROR, STATUS_COMPILE_ERROR, \
STATUS_RUN_TIMEOUT, STATUS_RUN_ERROR

from api.v1.api_pb2 import SDK_JAVA


@dataclass(frozen=True)
class Config:
SERVER_ADDRESS = os.getenv("SERVER_ADDRESS", "localhost:8080")
ERROR_STATUSES = [STATUS_VALIDATION_ERROR, STATUS_ERROR, STATUS_PREPARATION_ERROR, STATUS_COMPILE_ERROR,
STATUS_RUN_TIMEOUT, STATUS_RUN_ERROR]
SUPPORTED_SDK = {'java': SDK_JAVA}
42 changes: 42 additions & 0 deletions playground/infrastructure/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import sys
from logging import INFO, WARNING, ERROR, CRITICAL


def setup_logger():
"""
Setup logging.

Add 2 handler in root logger:
StreamHandler - for logs(INFO and WARNING levels) to the stdout
StreamHandler - for logs(ERROR and CRITICAL levels) to the stderr
"""
log = logging.getLogger()
log.setLevel(logging.INFO)
formatter = logging.Formatter('[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s')

stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.addFilter(lambda record: record.levelno in (INFO, WARNING))
stdout_handler.setFormatter(formatter)

stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.addFilter(lambda record: record.levelno in (ERROR, CRITICAL))
stderr_handler.setFormatter(formatter)

log.addHandler(stdout_handler)
log.addHandler(stderr_handler)