From 22b58d2e5b6916b3b80436c4b0f75efec521a828 Mon Sep 17 00:00:00 2001 From: Georgi Date: Tue, 7 Sep 2021 22:13:35 +0300 Subject: [PATCH 1/2] Added initial code for RDS and made S3 more OOP (with Bucket objects) --- cloud/amazon/common/aws_util.py | 6 + cloud/amazon/common/base_service.py | 23 +- cloud/amazon/common/exception_handling.py | 11 + cloud/amazon/services/__init__.py | 1 + cloud/amazon/services/ec2/__init__.py | 2 +- cloud/amazon/services/ec2/{ec2.py => _ec2.py} | 8 +- cloud/amazon/services/rds/__init__.py | 1 + cloud/amazon/services/rds/_rds.py | 59 ++++ cloud/amazon/services/rds/_rds_instance.py | 0 cloud/amazon/services/s3/_bucket.py | 41 ++- cloud/amazon/services/s3/_s3.py | 268 +++++++++++++----- etl/__init__.py | 0 etl/db/__init__.py | 0 etl/db/mysql/__init__.py | 0 14 files changed, 322 insertions(+), 98 deletions(-) create mode 100755 cloud/amazon/common/aws_util.py rename cloud/amazon/services/ec2/{ec2.py => _ec2.py} (83%) create mode 100755 cloud/amazon/services/rds/__init__.py create mode 100755 cloud/amazon/services/rds/_rds.py create mode 100755 cloud/amazon/services/rds/_rds_instance.py create mode 100755 etl/__init__.py create mode 100755 etl/db/__init__.py create mode 100755 etl/db/mysql/__init__.py diff --git a/cloud/amazon/common/aws_util.py b/cloud/amazon/common/aws_util.py new file mode 100755 index 0000000..1a8f1fb --- /dev/null +++ b/cloud/amazon/common/aws_util.py @@ -0,0 +1,6 @@ +from types_extensions import dict_type + + +def extract_aws_response_status_code(resp: dict_type) -> int: + if meta_ := resp.get('ResponseMetadata'): + return meta_.get('HTTPStatusCode') diff --git a/cloud/amazon/common/base_service.py b/cloud/amazon/common/base_service.py index f78a77c..57703b2 100755 --- a/cloud/amazon/common/base_service.py +++ b/cloud/amazon/common/base_service.py @@ -11,7 +11,7 @@ from cloud.amazon.common.exception_handling import ExceptionLevels from cloud.amazon.common.service_availability import ServiceAvailability -from types_extensions import tuple_type +from types_extensions import tuple_type, safe_type, void class BaseAmazonService(abc.ABC): @@ -19,12 +19,22 @@ class BaseAmazonService(abc.ABC): _backend: boto3.Session = boto3.Session() _client: BaseClient default_exception_level: int = ExceptionLevels.RAISE - region: str = _backend.region_name + + def __init__(self, profile: str = None, region: str = None, default_exception_level: int = None) -> void: + + self._spawn_session(profile, region) + + if default_exception_level: + self.default_exception_level = default_exception_level @abc.abstractmethod def check_service_availability(self) -> int: raise NotImplementedError + @property + def region(self) -> str: + return self._backend.region_name + @property @abc.abstractmethod def name(self) -> str: @@ -46,6 +56,15 @@ def is_named(self) -> bool: hasattr(self, 'suffix') ) + def _spawn_session(self, profile_name: str, region_name: str) -> void: + session_kwargs = {} + if profile_name: + session_kwargs['profile_name'] = profile_name + if region_name: + session_kwargs['region_name'] = region_name + if len(session_kwargs) > 0: + self._backend = boto3.Session(**session_kwargs) + def is_connected(self) -> bool: return (self.check_service_availability() & ServiceAvailability.CONNECTED) == ServiceAvailability.CONNECTED diff --git a/cloud/amazon/common/exception_handling.py b/cloud/amazon/common/exception_handling.py index 7985d0a..6ca9561 100755 --- a/cloud/amazon/common/exception_handling.py +++ b/cloud/amazon/common/exception_handling.py @@ -23,5 +23,16 @@ def __str__(self) -> str: return msg +class MissingParametersException(Exception): + + def __init__(self, msg: str = None, parameter_names: list_type[str] = None) -> void: + self.msg = msg or 'Some parameters were missing.' + self.parameter_names = parameter_names or ['not given'] + + def __str__(self) -> str: + msg = f"{self.msg}\nMissing params: {', '.join(self.parameter_names)}" + return msg + + class BucketNotEmptyException(Exception): ... diff --git a/cloud/amazon/services/__init__.py b/cloud/amazon/services/__init__.py index 1c8ce1a..0992dc8 100755 --- a/cloud/amazon/services/__init__.py +++ b/cloud/amazon/services/__init__.py @@ -1,2 +1,3 @@ from .s3 import * from .ec2 import * +from .rds import * diff --git a/cloud/amazon/services/ec2/__init__.py b/cloud/amazon/services/ec2/__init__.py index d12a7e2..6903d88 100755 --- a/cloud/amazon/services/ec2/__init__.py +++ b/cloud/amazon/services/ec2/__init__.py @@ -1 +1 @@ -from .ec2 import AmazonEC2 +from ._ec2 import AmazonEC2 diff --git a/cloud/amazon/services/ec2/ec2.py b/cloud/amazon/services/ec2/_ec2.py similarity index 83% rename from cloud/amazon/services/ec2/ec2.py rename to cloud/amazon/services/ec2/_ec2.py index 0ec6851..789ed02 100755 --- a/cloud/amazon/services/ec2/ec2.py +++ b/cloud/amazon/services/ec2/_ec2.py @@ -12,13 +12,13 @@ class AmazonEC2(BaseAmazonService): def __init__(self, profile: str = None, region: str = None, default_exception_level: int = None, delimiter: str = '', instance_name_prefix: str = '', instance_name_suffix: str = '') -> void: - if profile: - self._backend: boto3.Session = boto3.Session(profile_name=profile) + + super().__init__(profile, region, default_exception_level) + self.prefix: str = instance_name_prefix self.suffix: str = instance_name_suffix self.delimiter: str = delimiter - self.default_exception_level: int = default_exception_level or ExceptionLevels.RAISE - self.region: str = region or self._backend.region_name + self._client: const(BaseClient) = self._backend.client(AWSServiceNameMapping.EC2, region_name=region) def check_service_availability(self) -> int: diff --git a/cloud/amazon/services/rds/__init__.py b/cloud/amazon/services/rds/__init__.py new file mode 100755 index 0000000..e839613 --- /dev/null +++ b/cloud/amazon/services/rds/__init__.py @@ -0,0 +1 @@ +from ._rds import AmazonRDS \ No newline at end of file diff --git a/cloud/amazon/services/rds/_rds.py b/cloud/amazon/services/rds/_rds.py new file mode 100755 index 0000000..c0c4081 --- /dev/null +++ b/cloud/amazon/services/rds/_rds.py @@ -0,0 +1,59 @@ +import warnings + +from botocore.client import BaseClient + +from cloud.amazon.common.aws_service_name_mapping import AWSServiceNameMapping +from cloud.amazon.common.base_service import BaseAmazonService +from cloud.amazon.common.exception_handling import ExceptionLevels +from cloud.amazon.common.service_availability import ServiceAvailability +from types_extensions import void, const, list_type, dict_type + + +class AmazonRDS(BaseAmazonService): + # plan: List instances, describe one instance, bring up, enable/disable, destroy, backup/replicate + + def __init__(self, profile: str = None, region: str = None, default_exception_level: int = None) -> void: + + super().__init__(profile, region, default_exception_level) + self._client: const(BaseClient) = self._backend.client(AWSServiceNameMapping.RDS, region_name=self.region) + + def check_service_availability(self) -> int: + # Not done, for now assumed RDS is up. + status = ServiceAvailability.OFFLINE + if 1 == 1: + status |= ServiceAvailability.ONLINE + if 2 == 2: + status |= ServiceAvailability.CONNECTED + return status + + @property + def name(self) -> str: + return AWSServiceNameMapping.RDS + + def list_db_instances(self, exception_level: int = None) -> list_type[str]: + if not self._assert_connection(exception_level): + return [] + try: + aws_resp = self._client.describe_db_instances() + return [instance_['DBInstanceIdentifier'] for instance_ in aws_resp['DBInstances']] + except Exception as e: + if exception_level == ExceptionLevels.RAISE: + raise + if exception_level == ExceptionLevels.WARN: + warnings.warn(f"An unknown exception occurred while trying to list databases:\nThe error is:\n{e}") + + def describe_db_instance(self, instance_name: str, exception_level: int = None) -> dict_type: + if not self._assert_connection(exception_level): + return {} + try: + aws_resp = self._client.describe_db_instances( + DBInstanceIdentifier=instance_name + ) + if instances := aws_resp.get('DBInstances'): + return instances[0] + except Exception as e: + if exception_level == ExceptionLevels.RAISE: + raise + if exception_level == ExceptionLevels.WARN: + warnings.warn(f"An unknown exception occurred while trying to:\n" + f"Describe: {instance_name}\nThe error is:\n{e}") diff --git a/cloud/amazon/services/rds/_rds_instance.py b/cloud/amazon/services/rds/_rds_instance.py new file mode 100755 index 0000000..e69de29 diff --git a/cloud/amazon/services/s3/_bucket.py b/cloud/amazon/services/s3/_bucket.py index 41b95c1..2790244 100755 --- a/cloud/amazon/services/s3/_bucket.py +++ b/cloud/amazon/services/s3/_bucket.py @@ -2,7 +2,7 @@ from cloud.amazon.common.base_service_generated_instance import BaseSGI from properties_and_methods import CachedProperty -from types_extensions import void, const, list_type, dict_type +from types_extensions import void, const, list_type, dict_type, safe_type class AmazonS3Bucket(BaseSGI): @@ -19,47 +19,60 @@ def set_exception_level(self, new_level: int) -> void: @CachedProperty def defaults(self): return dict( - bucket_name=self.bucket_name, + bucket_obj=self, apply_format_to_bucket=False, exception_level=self.exception_level, ) - def _build_default_params(self, kwargs_dict: dict_type[str, Any]) -> dict_type[str, Any]: - return {**kwargs_dict, **self.defaults} + def _build_default_params(self, **kwargs) -> dict_type[str, Any]: + return {**kwargs, **self.defaults} + + def create(self, acl: str = 'private', region: str = None, **kwargs) -> 'AmazonS3Bucket': + return self.parent.create_bucket( + **self._build_default_params(**kwargs), + acl=acl, + region=region + ) + + def delete(self, force_delete_contents: bool = False, **kwargs) -> void: + return self.parent.delete_bucket( + **self._build_default_params(**kwargs), + force_delete_contents=force_delete_contents + ) def get_objects(self, mode: str = 'mapping', **kwargs) -> dict: return self.parent.get_objects_in_bucket( - **self._build_default_params(kwargs), - mode=mode, + **self._build_default_params(**kwargs), + mode=mode ) def put_object(self, object_path: str, **kwargs) -> bool: return self.parent.put_object_in_bucket( - **self._build_default_params(kwargs), - object_path=object_path, + **self._build_default_params(**kwargs), + object_path=object_path ) def delete_object(self, object_name: str, **kwargs) -> void: return self.parent.delete_object_from_bucket( - **self._build_default_params(kwargs), - object_name=object_name, + **self._build_default_params(**kwargs), + object_name=object_name ) def delete_objects(self, object_names: list_type[str], **kwargs) -> void: return self.parent.delete_objects_from_bucket( - **self._build_default_params(kwargs), - object_names=object_names, + **self._build_default_params(**kwargs), + object_names=object_names ) def get_all_object_versions(self, object_name: str, **kwargs) -> list_type[dict_type[str, str]]: return self.parent.get_all_object_versions( - **self._build_default_params(kwargs), + **self._build_default_params(**kwargs), object_name=object_name ) def download_object(self, object_name: str, destination: str, **kwargs) -> void: return self.parent.download_object_from_bucket( - **self._build_default_params(kwargs), + **self._build_default_params(**kwargs), object_name=object_name, destination=destination ) diff --git a/cloud/amazon/services/s3/_s3.py b/cloud/amazon/services/s3/_s3.py index c221a9a..bd795db 100755 --- a/cloud/amazon/services/s3/_s3.py +++ b/cloud/amazon/services/s3/_s3.py @@ -1,14 +1,14 @@ import os import warnings +from typing import Any import botocore.exceptions from botocore import exceptions as aws_exceptions -from cloud.amazon.common.exception_handling import ExceptionLevels, InvalidArgumentException +from cloud.amazon.common.aws_util import extract_aws_response_status_code +from cloud.amazon.common.exception_handling import ExceptionLevels, InvalidArgumentException, MissingParametersException from types_extensions import const, safe_type, void, list_type, dict_type -import boto3 - from cloud.amazon.common.aws_service_name_mapping import AWSServiceNameMapping from cloud.amazon.common.base_service import BaseAmazonService, BaseClient from cloud.amazon.common.service_availability import ServiceAvailability @@ -21,17 +21,14 @@ class AmazonS3(BaseAmazonService): def __init__(self, profile: str = None, region: str = None, default_exception_level: int = None, default_storage_class: str = None, delimiter: str = '', bucket_prefix: str = '', bucket_suffix: str = '') -> void: - if profile: - self._backend: boto3.Session = boto3.Session(profile_name=profile) + super().__init__(profile, region, default_exception_level) + self.default_storage_class: str = default_storage_class or S3StorageClass.STANDARD - if default_exception_level: - self.default_exception_level = default_exception_level self.delimiter: str = delimiter self.prefix: str = bucket_prefix self.suffix: str = bucket_suffix - if region: - self.region: str = region + self._client: const(BaseClient) = self._backend.client(AWSServiceNameMapping.S3, region_name=self.region) @property @@ -47,7 +44,9 @@ def check_service_availability(self) -> int: status |= ServiceAvailability.CONNECTED return status - def get_buckets(self, *, exception_level: int = None) -> list_type[str]: + def list_buckets(self, exception_level: int = None) -> list_type[str]: + + exception_level = exception_level or self.default_exception_level if not self._assert_connection(exception_level): return [] try: @@ -59,12 +58,36 @@ def get_buckets(self, *, exception_level: int = None) -> list_type[str]: if exception_level == ExceptionLevels.WARN: warnings.warn(f"An unknown exception occurred while trying to list buckets:\nThe error is:\n{e}") - def spawn_bucket(self, bucket_name: str, apply_format_to_bucket: bool = True, - exception_level: int = None) -> safe_type(AmazonS3Bucket): + def list_buckets_as_objects(self, exception_level: int = None) -> list_type[AmazonS3Bucket]: + exception_level = exception_level or self.default_exception_level - ok, region, bucket_name = self._setup(exception_level=exception_level, - root_name=bucket_name, - build_full_name=apply_format_to_bucket) + if not self._assert_connection(exception_level): + return [] + try: + aws_resp = self._client.list_buckets() + return [ + self.spawn_bucket_object( + bucket_name=bucket['Name'], + apply_format_to_bucket=False, + exception_level=exception_level + ) + for bucket in aws_resp['Buckets'] + ] + except Exception as e: + if exception_level == ExceptionLevels.RAISE: + raise + if exception_level == ExceptionLevels.WARN: + warnings.warn(f"An unknown exception occurred while trying to list bucket objects:\nThe error is:\n{e}") + + def spawn_bucket_object(self, bucket_name: str, apply_format_to_bucket: bool = True, + exception_level: int = None) -> safe_type(AmazonS3Bucket): + + exception_level = exception_level or self.default_exception_level + ok, region, bucket_name = self._setup( + exception_level=exception_level, + root_name=bucket_name, + build_full_name=apply_format_to_bucket + ) if not ok: return None @@ -74,16 +97,28 @@ def spawn_bucket(self, bucket_name: str, apply_format_to_bucket: bool = True, exception_level=exception_level ) - def create_bucket(self, bucket_name: str, apply_format_to_bucket: bool = True, - acl: str = 'private', region: str = None, lock_enabled: bool = False, - *, exception_level: int = None, **kwargs) -> safe_type(str): + def create_bucket(self, bucket_obj: AmazonS3Bucket = None, bucket_name: str = None, + apply_format_to_bucket: bool = True, acl: str = 'private', + region: str = None, lock_enabled: bool = False, exception_level: int = None, + **kwargs) -> safe_type(AmazonS3Bucket): + exception_level = exception_level or self.default_exception_level - ok, region, bucket_name = self._setup(exception_level=exception_level, - region=region, - root_name=bucket_name, - build_full_name=apply_format_to_bucket) + if not bucket_obj and not bucket_name: + self._handle_missing_bucket_params(exception_level) + ok, region, bucket_name = self._setup( + exception_level=exception_level, + region=region, + root_name=bucket_name if not bucket_obj else bucket_obj.bucket_name, + build_full_name=apply_format_to_bucket if not bucket_obj else False + ) if not ok: return None + + bucket_obj = bucket_obj or self.spawn_bucket_object( + bucket_name=bucket_name, + apply_format_to_bucket=False, + exception_level=exception_level + ) try: aws_resp = self._client.create_bucket( ACL=acl, @@ -92,7 +127,8 @@ def create_bucket(self, bucket_name: str, apply_format_to_bucket: bool = True, ObjectLockEnabledForBucket=lock_enabled, **kwargs ) - return aws_resp['Location'] + if extract_aws_response_status_code(aws_resp) < 300: + return bucket_obj except (self._client.exceptions.BucketAlreadyExists, self._client.exceptions.BucketAlreadyOwnedByYou): @@ -100,7 +136,7 @@ def create_bucket(self, bucket_name: str, apply_format_to_bucket: bool = True, raise if exception_level == ExceptionLevels.WARN: warnings.warn(f"A bucket with the same name ({bucket_name}) and permissions already exists.") - return bucket_name + return bucket_obj except aws_exceptions.ClientError as e: if exception_level == ExceptionLevels.RAISE: @@ -115,32 +151,43 @@ def create_bucket(self, bucket_name: str, apply_format_to_bucket: bool = True, raise kw_ = "was not" rv = None - if bucket_name in self.get_buckets(exception_level=exception_level): + if bucket_name in self.list_buckets(exception_level=exception_level): kw_ = "was" - rv = bucket_name + rv = bucket_obj if exception_level == ExceptionLevels.WARN: warnings.warn(f"An unknown exception occurred. The bucket {kw_} created. The error is:\n{e}") return rv - def delete_bucket(self, bucket_name: str, apply_format_to_bucket: bool = True, force_delete_contents: bool = False, + def delete_bucket(self, bucket_obj: AmazonS3Bucket = None, bucket_name: str = None, + apply_format_to_bucket: bool = True, force_delete_contents: bool = False, exception_level: int = None, **kwargs) -> void: + exception_level = exception_level or self.default_exception_level - ok, _, bucket_name = self._setup(exception_level=exception_level, - root_name=bucket_name, - build_full_name=apply_format_to_bucket) + if not bucket_obj and not bucket_name: + self._handle_missing_bucket_params(exception_level) + ok, _, bucket_name = self._setup( + exception_level=exception_level, + root_name=bucket_name if not bucket_obj else bucket_obj.bucket_name, + build_full_name=apply_format_to_bucket if not bucket_obj else False + ) if not ok: return if force_delete_contents: - contents = self.get_objects_in_bucket(bucket_name, - apply_format_to_bucket=False, - exception_level=exception_level - ) + contents = self.get_objects_in_bucket( + bucket_obj=bucket_obj, + bucket_name=bucket_name, + apply_format_to_bucket=False, + exception_level=exception_level + ) if contents: - self.delete_objects_from_bucket(bucket_name, - object_names=[val for val in contents.values()], - permanently=True, - apply_format_to_bucket=False, - exception_level=exception_level) + self.delete_objects_from_bucket( + bucket_obj=bucket_obj, + bucket_name=bucket_name, + object_names=[val for val in contents.values()], + permanently=True, + apply_format_to_bucket=False, + exception_level=exception_level + ) try: self._client.delete_bucket(Bucket=bucket_name, **kwargs) except botocore.exceptions.ClientError as e: @@ -159,19 +206,27 @@ def delete_bucket(self, bucket_name: str, apply_format_to_bucket: bool = True, f warnings.warn(f"An unknown exception occurred while trying to{force}:\n" f"Delete {bucket_name}\nThe error is:\n{e}") - def get_objects_in_bucket(self, bucket_name: str, apply_format_to_bucket: bool = True, - exception_level: int = None, mode: str = 'mapping', **kwargs) -> dict: + def get_objects_in_bucket(self, bucket_obj: AmazonS3Bucket = None, bucket_name: str = None, + apply_format_to_bucket: bool = True, exception_level: int = None, + mode: str = 'mapping', **kwargs) -> dict: + _allowed_modes = {'raw', 'mapping'} exception_level = exception_level or self.default_exception_level if mode not in _allowed_modes: raise InvalidArgumentException(mode, _allowed_modes) - ok, _, bucket_name = self._setup(exception_level=exception_level, - root_name=bucket_name, - build_full_name=apply_format_to_bucket) + + if not bucket_obj and not bucket_name: + self._handle_missing_bucket_params(exception_level) + ok, _, bucket_name = self._setup( + exception_level=exception_level, + root_name=bucket_name if not bucket_obj else bucket_obj.bucket_name, + build_full_name=apply_format_to_bucket if not bucket_obj else False + ) if not ok: return {} + try: - if (bucket_objects := self._client.list_objects(Bucket=bucket_name, **kwargs)).get('Contents'): + if bucket_objects := self._client.list_objects(Bucket=bucket_name, **kwargs).get('Contents'): match mode: case x if x == 'raw': @@ -188,18 +243,25 @@ def get_objects_in_bucket(self, bucket_name: str, apply_format_to_bucket: bool = f"From {bucket_name}.\nThe error is:\n{e}") return {} - def put_object_in_bucket(self, bucket_name: str, object_path: str, apply_format_to_bucket: bool = True, - object_name: str = None, exception_level: int = None, storage_class: str = None, - acl: str = 'private', encryption: str = 'aws:kms', metadata: dict_type[str, str] = None, - **kwargs) -> bool: + def put_object_in_bucket(self, object_path: str, bucket_obj: AmazonS3Bucket = None, bucket_name: str = None, + apply_format_to_bucket: bool = True, object_name: str = None, exception_level: int = None, + storage_class: str = None, acl: str = 'private', encryption: str = 'aws:kms', + metadata: dict_type[str, str] = None, **kwargs) -> bool: + exception_level = exception_level or self.default_exception_level storage_class = storage_class or self.default_storage_class object_name = object_name or object_path - ok, _, bucket_name = self._setup(exception_level=exception_level, - root_name=bucket_name, - build_full_name=apply_format_to_bucket) + + if not bucket_obj and not bucket_name: + self._handle_missing_bucket_params(exception_level) + ok, _, bucket_name = self._setup( + exception_level=exception_level, + root_name=bucket_name if not bucket_obj else bucket_obj.bucket_name, + build_full_name=apply_format_to_bucket if not bucket_obj else False + ) if not ok: return False + try: extra_args = { 'ACL': acl, @@ -229,15 +291,24 @@ def put_object_in_bucket(self, bucket_name: str, object_path: str, apply_format_ f"The error is:\n{e}") return False - def get_all_object_versions(self, bucket_name: str, object_name: str, match_exact: bool = True, - apply_format_to_bucket: bool = True, + def get_all_object_versions(self, object_name: str, bucket_obj: AmazonS3Bucket = None, bucket_name: str = None, + match_exact: bool = True, apply_format_to_bucket: bool = True, exception_level: int = None, **kwargs) -> list_type[dict_type[str, str]]: + rv = [] - ok, _, bucket_name = self._setup(exception_level=exception_level, - root_name=bucket_name, - build_full_name=apply_format_to_bucket) + + if not bucket_obj and not bucket_name: + self._handle_missing_bucket_params(exception_level) + + ok, _, bucket_name = self._setup( + exception_level=exception_level, + root_name=bucket_name if not bucket_obj else bucket_obj.bucket_name, + build_full_name=apply_format_to_bucket if not bucket_obj else False + ) + if not ok: return rv + try: resp = self._client.list_object_versions( Bucket=bucket_name, @@ -258,18 +329,34 @@ def get_all_object_versions(self, bucket_name: str, object_name: str, match_exac f"For {object_name}\nIn {bucket_name}.\nThe error is:\n{e}") return rv - def delete_object_from_bucket(self, bucket_name: str, object_name: str, permanently: bool = False, - apply_format_to_bucket: bool = True, exception_level: int = None, **kwargs) -> void: + def delete_object_from_bucket(self, object_name: str, bucket_obj: AmazonS3Bucket = None, bucket_name: str = None, + permanently: bool = False, apply_format_to_bucket: bool = True, + exception_level: int = None, **kwargs) -> void: + exception_level = exception_level or self.default_exception_level - ok, _, bucket_name = self._setup(exception_level=exception_level, - root_name=bucket_name, - build_full_name=apply_format_to_bucket) + + if not bucket_obj and not bucket_name: + self._handle_missing_bucket_params(exception_level) + + ok, _, bucket_name = self._setup( + exception_level=exception_level, + root_name=bucket_name if not bucket_obj else bucket_obj.bucket_name, + build_full_name=apply_format_to_bucket if not bucket_obj else False + ) + + if not ok: + return + try: if permanently: - all_versions = self.get_all_object_versions(bucket_name, object_name, - apply_format_to_bucket=False, - exception_level=exception_level, - **kwargs) + all_versions = self.get_all_object_versions( + object_name=object_name, + bucket_obj=bucket_obj, + bucket_name=bucket_name, + apply_format_to_bucket=False, + exception_level=exception_level, + **kwargs + ) if len(all_versions) > 0: self._client.delete_objects( Bucket=bucket_name, @@ -293,20 +380,31 @@ def delete_object_from_bucket(self, bucket_name: str, object_name: str, permanen warnings.warn(f"An unknown exception occurred while trying to{permanently}:\n" f"Delete {object_name}\nFrom {bucket_name}.\nThe error is:\n{e}") - def delete_objects_from_bucket(self, bucket_name: str, object_names: list_type[str], - permanently: bool = False, apply_format_to_bucket: bool = True, + def delete_objects_from_bucket(self, object_names: list_type[str], bucket_obj: AmazonS3Bucket = None, + bucket_name: str = None, permanently: bool = False, + apply_format_to_bucket: bool = True, exception_level: int = None, **kwargs) -> void: + exception_level = exception_level or self.default_exception_level - ok, _, bucket_name = self._setup(exception_level=exception_level, - root_name=bucket_name, - build_full_name=apply_format_to_bucket) + + if not bucket_obj and not bucket_name: + self._handle_missing_bucket_params(exception_level) + + ok, _, bucket_name = self._setup( + exception_level=exception_level, + root_name=bucket_name if not bucket_obj else bucket_obj.bucket_name, + build_full_name=apply_format_to_bucket if not bucket_obj else False + ) + if not ok: return + try: objects_to_delete = [] for object_name in object_names: if permanently: all_versions = self.get_all_object_versions( + bucket_obj=bucket_obj, bucket_name=bucket_name, object_name=object_name, apply_format_to_bucket=False, @@ -333,13 +431,19 @@ def delete_objects_from_bucket(self, bucket_name: str, object_names: list_type[s warnings.warn(f"An unknown exception occurred while trying to{permanently} bulk:\n" f"Delete {object_names}\nFrom {bucket_name}.\nThe error is:\n{e}") - def download_object_from_bucket(self, bucket_name: str, object_name: str, destination: str, - apply_format_to_bucket: bool = True, exception_level: int = None, - **kwargs) -> void: + def download_object_from_bucket(self, object_name: str, destination: str, bucket_obj: AmazonS3Bucket = None, + bucket_name: str = None, apply_format_to_bucket: bool = True, + exception_level: int = None, **kwargs) -> void: + + if not bucket_obj and not bucket_name: + self._handle_missing_bucket_params(exception_level) + + ok, _, bucket_name = self._setup( + exception_level=exception_level, + root_name=bucket_name if not bucket_obj else bucket_obj.bucket_name, + build_full_name=apply_format_to_bucket if not bucket_obj else False + ) - ok, _, bucket_name = self._setup(exception_level=exception_level, - root_name=bucket_name, - build_full_name=apply_format_to_bucket) if not ok: return @@ -365,3 +469,13 @@ def download_object_from_bucket(self, bucket_name: str, object_name: str, destin finally: if buffer: buffer.close() + + def _handle_missing_bucket_params(self, exception_level: int = None, default_return_value: Any = None): + + exception_level = exception_level or self.default_exception_level + err_msg = f"You need to provide either an AmazonS3Bucket object or a bucket name" + if exception_level == ExceptionLevels.RAISE: + raise MissingParametersException(msg=err_msg, parameter_names=['bucket_obj', 'bucket_name']) + if exception_level == ExceptionLevels.WARN: + warnings.warn(err_msg) + return default_return_value diff --git a/etl/__init__.py b/etl/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/etl/db/__init__.py b/etl/db/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/etl/db/mysql/__init__.py b/etl/db/mysql/__init__.py new file mode 100755 index 0000000..e69de29 From b3a96931754361925b461dcea91f5c76edda2b83 Mon Sep 17 00:00:00 2001 From: Georgi Date: Sat, 12 Feb 2022 12:11:57 +0200 Subject: [PATCH 2/2] Fixes and some RDS base code --- cloud/amazon/common/aws_util.py | 1 + cloud/amazon/common/base_service.py | 2 +- .../common/base_service_generated_instance.py | 16 +++++++- cloud/amazon/common/exception_handling.py | 15 ++++++- cloud/amazon/services/rds/_rds_engine.py | 20 +++++++++ cloud/amazon/services/rds/_rds_instance.py | 41 +++++++++++++++++++ cloud/amazon/services/s3/_bucket.py | 17 ++++++-- cloud/amazon/services/s3/_s3.py | 28 ++++++++----- cloud/amazon/services/s3/exceptions.py | 4 ++ properties_and_methods/cached_property.py | 4 +- 10 files changed, 129 insertions(+), 19 deletions(-) create mode 100755 cloud/amazon/services/rds/_rds_engine.py create mode 100755 cloud/amazon/services/s3/exceptions.py diff --git a/cloud/amazon/common/aws_util.py b/cloud/amazon/common/aws_util.py index 1a8f1fb..656697e 100755 --- a/cloud/amazon/common/aws_util.py +++ b/cloud/amazon/common/aws_util.py @@ -4,3 +4,4 @@ def extract_aws_response_status_code(resp: dict_type) -> int: if meta_ := resp.get('ResponseMetadata'): return meta_.get('HTTPStatusCode') + return 404 diff --git a/cloud/amazon/common/base_service.py b/cloud/amazon/common/base_service.py index 57703b2..aa4ca9b 100755 --- a/cloud/amazon/common/base_service.py +++ b/cloud/amazon/common/base_service.py @@ -11,7 +11,7 @@ from cloud.amazon.common.exception_handling import ExceptionLevels from cloud.amazon.common.service_availability import ServiceAvailability -from types_extensions import tuple_type, safe_type, void +from types_extensions import tuple_type, void class BaseAmazonService(abc.ABC): diff --git a/cloud/amazon/common/base_service_generated_instance.py b/cloud/amazon/common/base_service_generated_instance.py index 4482bb7..1a99b27 100755 --- a/cloud/amazon/common/base_service_generated_instance.py +++ b/cloud/amazon/common/base_service_generated_instance.py @@ -5,5 +5,19 @@ class BaseSGI(metaclass=abc.ABCMeta): - def __init__(self, *, exception_level: int) -> void: + def __init__(self, *, parent, exception_level: int) -> void: self.exception_level: int = exception_level + self.parent = parent + + @classmethod + @abc.abstractmethod + def from_aws_response(cls, aws_resp: dict, parent, exception_level: int, **kwargs) -> 'BaseSGI': + raise NotImplementedError + + @abc.abstractmethod + def create(self, *args, **kwargs) -> 'BaseSGI': + raise NotImplementedError + + @abc.abstractmethod + def delete(self, *args, **kwargs) -> void: + raise NotImplementedError diff --git a/cloud/amazon/common/exception_handling.py b/cloud/amazon/common/exception_handling.py index 6ca9561..9b00775 100755 --- a/cloud/amazon/common/exception_handling.py +++ b/cloud/amazon/common/exception_handling.py @@ -34,5 +34,16 @@ def __str__(self) -> str: return msg -class BucketNotEmptyException(Exception): - ... +class InvalidAWSResponseException(Exception): + + def __init__(self, expected_dict_structures: list_type[list_type[str]]) -> void: + self.expected_dict_structures: list_type[str] = [ + f"aws_response[{']['.join([path_ for path_ in structure])}]" + for structure in expected_dict_structures + ] + + def __str__(self) -> str: + newl_ = "\n" + msg = f"The AWS Response you provided is invalid. The given dict requires the following paths:\n" \ + f"{newl_.join(self.expected_dict_structures)}" + return msg diff --git a/cloud/amazon/services/rds/_rds_engine.py b/cloud/amazon/services/rds/_rds_engine.py new file mode 100755 index 0000000..26e8fff --- /dev/null +++ b/cloud/amazon/services/rds/_rds_engine.py @@ -0,0 +1,20 @@ +from meta.config_meta import FinalConfigMeta +from types_extensions import const + + +class RDSEngine(metaclass=FinalConfigMeta): + + AURORA: const(str) = 'aurora' + AURORA_MYSQL: const(str) = 'aurora-mysql' + AURORA_POSTGRESQL: const(str) = 'aurora-postgresql' + MARIADB: const(str) = 'mariadb' + MYSQL: const(str) = 'mysql' + ORACLE_EE: const(str) = 'oracle-ee' + ORACLE_EE_CDB: const(str) = 'oracle-ee-cdb' + ORACLE_SE2: const(str) = 'oracle-se2' + ORACLE_SE2_CDB: const(str) = 'oracle-se2-cdb' + POSTGRES: const(str) = 'postgres' + SQLSERVER_EE: const(str) = 'sqlserver-ee' + SQLSERVER_SE: const(str) = 'sqlserver-se' + SQLSERVER_EX: const(str) = 'sqlserver-ex' + SQLSERVER_WEB: const(str) = 'sqlserver-web' diff --git a/cloud/amazon/services/rds/_rds_instance.py b/cloud/amazon/services/rds/_rds_instance.py index e69de29..03cce95 100755 --- a/cloud/amazon/services/rds/_rds_instance.py +++ b/cloud/amazon/services/rds/_rds_instance.py @@ -0,0 +1,41 @@ +from typing import Any + +from cloud.amazon.common.base_service_generated_instance import BaseSGI +from properties_and_methods import CachedProperty +from types_extensions import void, const, list_type, dict_type + + +class AmazonRDSInstance(BaseSGI): + + def __init__(self, db_name: str, db_instance_identifier: str, db_instance_class: str, db_engine: str, + db_port: int, parent, exception_level: int) -> void: + super().__init__(parent=parent, exception_level=exception_level) + self.db_name: const(str) = db_name + self.db_instance_identifier: const(str) = db_instance_identifier + self.db_instance_class: const(str) = db_instance_class + self.db_engine: const(str) = db_engine + self.db_port: const(int) = db_port + + def set_exception_level(self, new_level: int) -> void: + self.exception_level = new_level + self.invalidate_cached_property_defaults() + + @classmethod + def from_aws_response(cls, aws_resp: dict, parent, exception_level: int, **kwargs) -> 'AmazonRDSInstance': + ... + + @CachedProperty + def defaults(self): + return dict( + rds_obj=self, + exception_level=self.exception_level, + ) + + def _build_default_params(self, **kwargs) -> dict_type[str, Any]: + return {**self.defaults, **kwargs} + + def create(self, acl: str = 'private', region: str = None, **kwargs) -> 'AmazonRDSInstance': + ... + + def delete(self, force_delete_contents: bool = False, **kwargs) -> void: + ... diff --git a/cloud/amazon/services/s3/_bucket.py b/cloud/amazon/services/s3/_bucket.py index 2790244..a126d02 100755 --- a/cloud/amazon/services/s3/_bucket.py +++ b/cloud/amazon/services/s3/_bucket.py @@ -1,16 +1,27 @@ from typing import Any from cloud.amazon.common.base_service_generated_instance import BaseSGI +from cloud.amazon.common.exception_handling import InvalidAWSResponseException from properties_and_methods import CachedProperty -from types_extensions import void, const, list_type, dict_type, safe_type +from types_extensions import void, const, list_type, dict_type class AmazonS3Bucket(BaseSGI): def __init__(self, bucket_name: str, parent, exception_level: int) -> void: - super().__init__(exception_level=exception_level) + super().__init__(parent=parent, exception_level=exception_level) self.bucket_name: const(str) = bucket_name - self.parent = parent + + @classmethod + def from_aws_response(cls, aws_resp: dict, parent, exception_level: int, **kwargs) -> 'AmazonS3Bucket': + try: + return AmazonS3Bucket( + bucket_name=aws_resp['Name'], + parent=parent, + exception_level=exception_level + ) + except KeyError: + raise InvalidAWSResponseException([['Name']]) def set_exception_level(self, new_level: int) -> void: self.exception_level = new_level diff --git a/cloud/amazon/services/s3/_s3.py b/cloud/amazon/services/s3/_s3.py index bd795db..33cfbff 100755 --- a/cloud/amazon/services/s3/_s3.py +++ b/cloud/amazon/services/s3/_s3.py @@ -2,11 +2,12 @@ import warnings from typing import Any -import botocore.exceptions from botocore import exceptions as aws_exceptions from cloud.amazon.common.aws_util import extract_aws_response_status_code -from cloud.amazon.common.exception_handling import ExceptionLevels, InvalidArgumentException, MissingParametersException +from cloud.amazon.common.exception_handling import ExceptionLevels, InvalidArgumentException, \ + MissingParametersException +from cloud.amazon.services.s3.exceptions import BucketNotEmptyException from types_extensions import const, safe_type, void, list_type, dict_type from cloud.amazon.common.aws_service_name_mapping import AWSServiceNameMapping @@ -172,13 +173,15 @@ def delete_bucket(self, bucket_obj: AmazonS3Bucket = None, bucket_name: str = No ) if not ok: return + + contents = self.get_objects_in_bucket( + bucket_obj=bucket_obj, + bucket_name=bucket_name, + apply_format_to_bucket=False, + exception_level=exception_level + ) + if force_delete_contents: - contents = self.get_objects_in_bucket( - bucket_obj=bucket_obj, - bucket_name=bucket_name, - apply_format_to_bucket=False, - exception_level=exception_level - ) if contents: self.delete_objects_from_bucket( bucket_obj=bucket_obj, @@ -188,9 +191,12 @@ def delete_bucket(self, bucket_obj: AmazonS3Bucket = None, bucket_name: str = No apply_format_to_bucket=False, exception_level=exception_level ) + else: + if contents: + raise BucketNotEmptyException try: self._client.delete_bucket(Bucket=bucket_name, **kwargs) - except botocore.exceptions.ClientError as e: + except aws_exceptions.ClientError as e: if exception_level == ExceptionLevels.RAISE: raise e = str(e) @@ -462,10 +468,12 @@ def download_object_from_bucket(self, object_name: str, destination: str, bucket except Exception as e: if exception_level == ExceptionLevels.RAISE: + if buffer: + buffer.close() raise if exception_level == ExceptionLevels.WARN: warnings.warn(f"An unknown exception occurred while trying to:\n" - f"Download {object_name}\nFrom {bucket_name}\nTo {buffer}\nThe error is:\n{e}") + f"Download {object_name}\nFrom {bucket_name}\nTo {destination}\nThe error is:\n{e}") finally: if buffer: buffer.close() diff --git a/cloud/amazon/services/s3/exceptions.py b/cloud/amazon/services/s3/exceptions.py new file mode 100755 index 0000000..920f1da --- /dev/null +++ b/cloud/amazon/services/s3/exceptions.py @@ -0,0 +1,4 @@ + + +class BucketNotEmptyException(Exception): + ... diff --git a/properties_and_methods/cached_property.py b/properties_and_methods/cached_property.py index fe04eab..6033203 100755 --- a/properties_and_methods/cached_property.py +++ b/properties_and_methods/cached_property.py @@ -25,7 +25,6 @@ class _Missing(metaclass=SingletonMeta): def __init__(self, wrapped: Callable) -> void: super().__init__(wrapped) self.cached_var: Any = self._Missing() - self.invalidate_method_name = self.build_invalidate_method_name(wrapped.__name__) @staticmethod def build_invalidate_method_name(property_name: str) -> str: @@ -37,7 +36,8 @@ def __get__(self, obj: type, klass: type = None) -> Any: if self.cached_var is not self._Missing(): return self.cached_var result = self.cached_var = self._func(obj) - setattr(obj, self.invalidate_method_name, self.invalidate) + invalidate_method_name = self.build_invalidate_method_name(self._func.__name__) + setattr(obj, invalidate_method_name, self.invalidate) return result def invalidate(self) -> void: