From 982c3b60795d857dc5c06361d90a53c5ac942090 Mon Sep 17 00:00:00 2001 From: EricGao888 Date: Fri, 11 Mar 2022 17:20:43 +0800 Subject: [PATCH 1/2] Switch unit tests for oss operator in alibaba-provider to use mocks instead of real OSS (#17617) --- .../alibaba/cloud/operators/test_oss.py | 163 +++++++++--------- 1 file changed, 81 insertions(+), 82 deletions(-) diff --git a/tests/providers/alibaba/cloud/operators/test_oss.py b/tests/providers/alibaba/cloud/operators/test_oss.py index af4b2c550f909..ec556a592aed2 100644 --- a/tests/providers/alibaba/cloud/operators/test_oss.py +++ b/tests/providers/alibaba/cloud/operators/test_oss.py @@ -16,13 +16,9 @@ # specific language governing permissions and limitations # under the License. # -import os import unittest +from unittest import mock -import oss2 - -from airflow.exceptions import AirflowException -from airflow.providers.alibaba.cloud.hooks.oss import OSSHook from airflow.providers.alibaba.cloud.operators.oss import ( OSSCreateBucketOperator, OSSDeleteBatchObjectOperator, @@ -31,96 +27,99 @@ OSSDownloadObjectOperator, OSSUploadObjectOperator, ) -from tests.providers.alibaba.cloud.utils.test_utils import skip_test_if_no_valid_conn_id -TEST_CONN_ID = os.environ.get('TEST_OSS_CONN_ID', 'oss_default') -TEST_REGION = os.environ.get('TEST_OSS_REGION', 'us-east-1') -TEST_BUCKET = os.environ.get('TEST_OSS_BUCKET', 'test-bucket') -TEST_FILE_PATH = '/tmp/airflow-test' +MOCK_TASK_ID = "test-oss-operator" +MOCK_REGION = "mock_region" +MOCK_BUCKET = "mock_bucket_name" +MOCK_OSS_CONN_ID = "mock_oss_conn_default" +MOCK_KEY = "mock_key" +MOCK_KEYS = ["mock_key1", "mock_key_2", "mock_key3"] +MOCK_CONTENT = "mock_content" -class TestOSSOperator(unittest.TestCase): - def setUp(self): - self.create_bucket_operator = OSSCreateBucketOperator( - oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-1' - ) - self.delete_bucket_operator = OSSDeleteBucketOperator( - oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-2' +class TestOSSCreateBucketOperator(unittest.TestCase): + @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook") + def test_execute(self, mock_hook): + operator = OSSCreateBucketOperator( + task_id=MOCK_TASK_ID, region=MOCK_REGION, bucket_name=MOCK_BUCKET, oss_conn_id=MOCK_OSS_CONN_ID ) - try: - self.hook = OSSHook(region=TEST_REGION) - self.hook.object_exists(key='test-obj', bucket_name=TEST_BUCKET) - except AirflowException: - self.hook = None - except oss2.exceptions.ServerError as e: - if e.status == 403: - self.hook = None + operator.execute(None) + mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION) + mock_hook.return_value.create_bucket.assert_called_once_with(bucket_name=MOCK_BUCKET) - @skip_test_if_no_valid_conn_id - def test_init(self): - assert self.create_bucket_operator.oss_conn_id == TEST_CONN_ID - @skip_test_if_no_valid_conn_id - def test_create_delete_bucket(self): - self.create_bucket_operator.execute({}) - self.delete_bucket_operator.execute({}) +class TestOSSDeleteBucketOperator(unittest.TestCase): + @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook") + def test_execute(self, mock_hook): + operator = OSSDeleteBucketOperator( + task_id=MOCK_TASK_ID, region=MOCK_REGION, bucket_name=MOCK_BUCKET, oss_conn_id=MOCK_OSS_CONN_ID + ) + operator.execute(None) + mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION) + mock_hook.return_value.delete_bucket.assert_called_once_with(bucket_name=MOCK_BUCKET) - @skip_test_if_no_valid_conn_id - def test_object(self): - self.create_bucket_operator.execute({}) - upload_file = f'{TEST_FILE_PATH}_upload_1' - if not os.path.exists(upload_file): - with open(upload_file, 'w') as f: - f.write('test') - upload_object_operator = OSSUploadObjectOperator( - key='obj', - file=upload_file, - oss_conn_id=TEST_CONN_ID, - region=TEST_REGION, - bucket_name=TEST_BUCKET, - task_id='task-1', +class TestOSSUploadObjectOperator(unittest.TestCase): + @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook") + def test_execute(self, mock_hook): + operator = OSSUploadObjectOperator( + task_id=MOCK_TASK_ID, + region=MOCK_REGION, + bucket_name=MOCK_BUCKET, + oss_conn_id=MOCK_OSS_CONN_ID, + key=MOCK_KEY, + file=MOCK_CONTENT, ) - upload_object_operator.execute({}) - assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET) - - download_file = f'{TEST_FILE_PATH}_download_1' - download_object_operator = OSSDownloadObjectOperator( - key='obj', - file=download_file, - oss_conn_id=TEST_CONN_ID, - region=TEST_REGION, - bucket_name=TEST_BUCKET, - task_id='task-2', + operator.execute(None) + mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION) + mock_hook.return_value.upload_local_file.assert_called_once_with( + bucket_name=MOCK_BUCKET, key=MOCK_KEY, file=MOCK_CONTENT ) - download_object_operator.execute({}) - assert os.path.exists(download_file) - delete_object_operator = OSSDeleteObjectOperator( - key='obj', oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-3' - ) - delete_object_operator.execute({}) - assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET) is False - upload_object_operator = OSSUploadObjectOperator( - key='obj', - file=upload_file, - oss_conn_id=TEST_CONN_ID, - region=TEST_REGION, - bucket_name=TEST_BUCKET, - task_id='task-4', +class TestOSSDownloadObjectOperator(unittest.TestCase): + @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook") + def test_execute(self, mock_hook): + operator = OSSDownloadObjectOperator( + task_id=MOCK_TASK_ID, + region=MOCK_REGION, + bucket_name=MOCK_BUCKET, + oss_conn_id=MOCK_OSS_CONN_ID, + key=MOCK_KEY, + file=MOCK_CONTENT, ) - upload_object_operator.execute({}) - assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET) + operator.execute(None) + mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION) + mock_hook.return_value.download_file.assert_called_once_with( + bucket_name=MOCK_BUCKET, key=MOCK_KEY, local_file=MOCK_CONTENT + ) + - delete_objects_operator = OSSDeleteBatchObjectOperator( - keys=['obj'], - oss_conn_id=TEST_CONN_ID, - region=TEST_REGION, - bucket_name=TEST_BUCKET, - task_id='task-5', +class TestOSSDeleteBatchObjectOperator(unittest.TestCase): + @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook") + def test_execute(self, mock_hook): + operator = OSSDeleteBatchObjectOperator( + task_id=MOCK_TASK_ID, + region=MOCK_REGION, + bucket_name=MOCK_BUCKET, + oss_conn_id=MOCK_OSS_CONN_ID, + keys=MOCK_KEYS, ) - delete_objects_operator.execute({}) - assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET) is False + operator.execute(None) + mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION) + mock_hook.return_value.delete_objects.assert_called_once_with(bucket_name=MOCK_BUCKET, key=MOCK_KEYS) - self.delete_bucket_operator.execute({}) + +class TestOSSDeleteObjectOperator(unittest.TestCase): + @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook") + def test_execute(self, mock_hook): + operator = OSSDeleteObjectOperator( + task_id=MOCK_TASK_ID, + region=MOCK_REGION, + bucket_name=MOCK_BUCKET, + oss_conn_id=MOCK_OSS_CONN_ID, + key=MOCK_KEY, + ) + operator.execute(None) + mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION) + mock_hook.return_value.delete_object.assert_called_once_with(bucket_name=MOCK_BUCKET, key=MOCK_KEY) From 0ab990c280a6cafd1f3d8387842d1857360cc75d Mon Sep 17 00:00:00 2001 From: EricGao888 Date: Fri, 11 Mar 2022 19:03:19 +0800 Subject: [PATCH 2/2] Switch unit tests for oss sensor in alibaba-provider to use mocks instead of real OSS (#17617) --- .../alibaba/cloud/sensors/test_oss_key.py | 102 +++++++----------- 1 file changed, 41 insertions(+), 61 deletions(-) diff --git a/tests/providers/alibaba/cloud/sensors/test_oss_key.py b/tests/providers/alibaba/cloud/sensors/test_oss_key.py index 1273e6d7cb1ec..48e98d9e73ae9 100644 --- a/tests/providers/alibaba/cloud/sensors/test_oss_key.py +++ b/tests/providers/alibaba/cloud/sensors/test_oss_key.py @@ -16,78 +16,58 @@ # specific language governing permissions and limitations # under the License. # -import os -import unittest -import oss2 +import unittest +from unittest import mock +from unittest.mock import PropertyMock -from airflow.exceptions import AirflowException -from airflow.providers.alibaba.cloud.hooks.oss import OSSHook -from airflow.providers.alibaba.cloud.operators.oss import ( - OSSCreateBucketOperator, - OSSDeleteBucketOperator, - OSSDeleteObjectOperator, - OSSUploadObjectOperator, -) from airflow.providers.alibaba.cloud.sensors.oss_key import OSSKeySensor -from tests.providers.alibaba.cloud.utils.test_utils import skip_test_if_no_valid_conn_id -TEST_CONN_ID = os.environ.get('TEST_OSS_CONN_ID', 'oss_default') -TEST_REGION = os.environ.get('TEST_OSS_REGION', 'us-east-1') -TEST_BUCKET = os.environ.get('TEST_OSS_BUCKET', 'test-bucket') -TEST_FILE_PATH = '/tmp/airflow-test' +OSS_SENSOR_STRING = 'airflow.providers.alibaba.cloud.sensors.oss_key.{}' +MOCK_TASK_ID = "test-oss-operator" +MOCK_REGION = "mock_region" +MOCK_BUCKET = "mock_bucket_name" +MOCK_OSS_CONN_ID = "mock_oss_conn_default" +MOCK_KEY = "mock_key" +MOCK_KEYS = ["mock_key1", "mock_key_2", "mock_key3"] +MOCK_CONTENT = "mock_content" -class TestOSSSensor(unittest.TestCase): +class TestOSSKeySensor(unittest.TestCase): def setUp(self): self.sensor = OSSKeySensor( - bucket_key='obj', - oss_conn_id=TEST_CONN_ID, - region=TEST_REGION, - bucket_name=TEST_BUCKET, - task_id='task-1', + bucket_key=MOCK_KEY, + oss_conn_id=MOCK_OSS_CONN_ID, + region=MOCK_REGION, + bucket_name=MOCK_BUCKET, + task_id=MOCK_TASK_ID, ) - try: - self.hook = OSSHook(region=TEST_REGION, oss_conn_id=TEST_CONN_ID) - self.hook.object_exists(key='test-obj', bucket_name=TEST_BUCKET) - except AirflowException: - self.hook = None - except oss2.exceptions.ServerError as e: - if e.status == 403: - self.hook = None - @skip_test_if_no_valid_conn_id - def test_init(self): - assert self.sensor.oss_conn_id == TEST_CONN_ID + @mock.patch(OSS_SENSOR_STRING.format("OSSHook")) + def test_get_hook(self, mock_service): + self.sensor.get_hook() + mock_service.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION) - @skip_test_if_no_valid_conn_id - def test_poke(self): - create_bucket_operator = OSSCreateBucketOperator( - oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-2' - ) - create_bucket_operator.execute({}) + @mock.patch(OSS_SENSOR_STRING.format("OSSKeySensor.get_hook"), new_callable=PropertyMock) + def test_poke_exsiting_key(self, mock_service): + # Given + mock_service.return_value.object_exists.return_value = True - upload_file = f'{TEST_FILE_PATH}_upload_1' - if not os.path.exists(upload_file): - with open(upload_file, 'w') as f: - f.write('test') - upload_object_operator = OSSUploadObjectOperator( - key='obj', - file=upload_file, - oss_conn_id=TEST_CONN_ID, - region=TEST_REGION, - bucket_name=TEST_BUCKET, - task_id='task-3', - ) - upload_object_operator.execute({}) - assert self.sensor.poke({}) + # When + res = self.sensor.poke(None) - delete_object_operator = OSSDeleteObjectOperator( - key='obj', oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-4' - ) - delete_object_operator.execute({}) + # Then + assert res is True + mock_service.return_value.object_exists.assert_called_once_with(key=MOCK_KEY, bucket_name=MOCK_BUCKET) - delete_bucket_operator = OSSDeleteBucketOperator( - oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-5' - ) - delete_bucket_operator.execute({}) + @mock.patch(OSS_SENSOR_STRING.format("OSSKeySensor.get_hook"), new_callable=PropertyMock) + def test_poke_non_exsiting_key(self, mock_service): + # Given + mock_service.return_value.object_exists.return_value = False + + # When + res = self.sensor.poke(None) + + # Then + assert res is False + mock_service.return_value.object_exists.assert_called_once_with(key=MOCK_KEY, bucket_name=MOCK_BUCKET)