Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 81 additions & 82 deletions tests/providers/alibaba/cloud/operators/test_oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@
# specific language governing permissions and limitations
# under the License.
#
import os
import unittest
from unittest import mock

import oss2

from airflow.exceptions import AirflowException
from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
from airflow.providers.alibaba.cloud.operators.oss import (
OSSCreateBucketOperator,
OSSDeleteBatchObjectOperator,
Expand All @@ -31,96 +27,99 @@
OSSDownloadObjectOperator,
OSSUploadObjectOperator,
)
from tests.providers.alibaba.cloud.utils.test_utils import skip_test_if_no_valid_conn_id

TEST_CONN_ID = os.environ.get('TEST_OSS_CONN_ID', 'oss_default')
TEST_REGION = os.environ.get('TEST_OSS_REGION', 'us-east-1')
TEST_BUCKET = os.environ.get('TEST_OSS_BUCKET', 'test-bucket')
TEST_FILE_PATH = '/tmp/airflow-test'
MOCK_TASK_ID = "test-oss-operator"
MOCK_REGION = "mock_region"
MOCK_BUCKET = "mock_bucket_name"
MOCK_OSS_CONN_ID = "mock_oss_conn_default"
MOCK_KEY = "mock_key"
MOCK_KEYS = ["mock_key1", "mock_key_2", "mock_key3"]
MOCK_CONTENT = "mock_content"


class TestOSSOperator(unittest.TestCase):
def setUp(self):
self.create_bucket_operator = OSSCreateBucketOperator(
oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-1'
)
self.delete_bucket_operator = OSSDeleteBucketOperator(
oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-2'
class TestOSSCreateBucketOperator(unittest.TestCase):
@mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
def test_execute(self, mock_hook):
operator = OSSCreateBucketOperator(
task_id=MOCK_TASK_ID, region=MOCK_REGION, bucket_name=MOCK_BUCKET, oss_conn_id=MOCK_OSS_CONN_ID
)
try:
self.hook = OSSHook(region=TEST_REGION)
self.hook.object_exists(key='test-obj', bucket_name=TEST_BUCKET)
except AirflowException:
self.hook = None
except oss2.exceptions.ServerError as e:
if e.status == 403:
self.hook = None
operator.execute(None)
mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
mock_hook.return_value.create_bucket.assert_called_once_with(bucket_name=MOCK_BUCKET)

@skip_test_if_no_valid_conn_id
def test_init(self):
assert self.create_bucket_operator.oss_conn_id == TEST_CONN_ID

@skip_test_if_no_valid_conn_id
def test_create_delete_bucket(self):
self.create_bucket_operator.execute({})
self.delete_bucket_operator.execute({})
class TestOSSDeleteBucketOperator(unittest.TestCase):
@mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
def test_execute(self, mock_hook):
operator = OSSDeleteBucketOperator(
task_id=MOCK_TASK_ID, region=MOCK_REGION, bucket_name=MOCK_BUCKET, oss_conn_id=MOCK_OSS_CONN_ID
)
operator.execute(None)
mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
mock_hook.return_value.delete_bucket.assert_called_once_with(bucket_name=MOCK_BUCKET)

@skip_test_if_no_valid_conn_id
def test_object(self):
self.create_bucket_operator.execute({})

upload_file = f'{TEST_FILE_PATH}_upload_1'
if not os.path.exists(upload_file):
with open(upload_file, 'w') as f:
f.write('test')
upload_object_operator = OSSUploadObjectOperator(
key='obj',
file=upload_file,
oss_conn_id=TEST_CONN_ID,
region=TEST_REGION,
bucket_name=TEST_BUCKET,
task_id='task-1',
class TestOSSUploadObjectOperator(unittest.TestCase):
@mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
def test_execute(self, mock_hook):
operator = OSSUploadObjectOperator(
task_id=MOCK_TASK_ID,
region=MOCK_REGION,
bucket_name=MOCK_BUCKET,
oss_conn_id=MOCK_OSS_CONN_ID,
key=MOCK_KEY,
file=MOCK_CONTENT,
)
upload_object_operator.execute({})
assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET)

download_file = f'{TEST_FILE_PATH}_download_1'
download_object_operator = OSSDownloadObjectOperator(
key='obj',
file=download_file,
oss_conn_id=TEST_CONN_ID,
region=TEST_REGION,
bucket_name=TEST_BUCKET,
task_id='task-2',
operator.execute(None)
mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
mock_hook.return_value.upload_local_file.assert_called_once_with(
bucket_name=MOCK_BUCKET, key=MOCK_KEY, file=MOCK_CONTENT
)
download_object_operator.execute({})
assert os.path.exists(download_file)

delete_object_operator = OSSDeleteObjectOperator(
key='obj', oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-3'
)
delete_object_operator.execute({})
assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET) is False

upload_object_operator = OSSUploadObjectOperator(
key='obj',
file=upload_file,
oss_conn_id=TEST_CONN_ID,
region=TEST_REGION,
bucket_name=TEST_BUCKET,
task_id='task-4',
class TestOSSDownloadObjectOperator(unittest.TestCase):
@mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
def test_execute(self, mock_hook):
operator = OSSDownloadObjectOperator(
task_id=MOCK_TASK_ID,
region=MOCK_REGION,
bucket_name=MOCK_BUCKET,
oss_conn_id=MOCK_OSS_CONN_ID,
key=MOCK_KEY,
file=MOCK_CONTENT,
)
upload_object_operator.execute({})
assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET)
operator.execute(None)
mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
mock_hook.return_value.download_file.assert_called_once_with(
bucket_name=MOCK_BUCKET, key=MOCK_KEY, local_file=MOCK_CONTENT
)


delete_objects_operator = OSSDeleteBatchObjectOperator(
keys=['obj'],
oss_conn_id=TEST_CONN_ID,
region=TEST_REGION,
bucket_name=TEST_BUCKET,
task_id='task-5',
class TestOSSDeleteBatchObjectOperator(unittest.TestCase):
@mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
def test_execute(self, mock_hook):
operator = OSSDeleteBatchObjectOperator(
task_id=MOCK_TASK_ID,
region=MOCK_REGION,
bucket_name=MOCK_BUCKET,
oss_conn_id=MOCK_OSS_CONN_ID,
keys=MOCK_KEYS,
)
delete_objects_operator.execute({})
assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET) is False
operator.execute(None)
mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
mock_hook.return_value.delete_objects.assert_called_once_with(bucket_name=MOCK_BUCKET, key=MOCK_KEYS)

self.delete_bucket_operator.execute({})

class TestOSSDeleteObjectOperator(unittest.TestCase):
@mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
def test_execute(self, mock_hook):
operator = OSSDeleteObjectOperator(
task_id=MOCK_TASK_ID,
region=MOCK_REGION,
bucket_name=MOCK_BUCKET,
oss_conn_id=MOCK_OSS_CONN_ID,
key=MOCK_KEY,
)
operator.execute(None)
mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
mock_hook.return_value.delete_object.assert_called_once_with(bucket_name=MOCK_BUCKET, key=MOCK_KEY)
102 changes: 41 additions & 61 deletions tests/providers/alibaba/cloud/sensors/test_oss_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,78 +16,58 @@
# specific language governing permissions and limitations
# under the License.
#
import os
import unittest

import oss2
import unittest
from unittest import mock
from unittest.mock import PropertyMock

from airflow.exceptions import AirflowException
from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
from airflow.providers.alibaba.cloud.operators.oss import (
OSSCreateBucketOperator,
OSSDeleteBucketOperator,
OSSDeleteObjectOperator,
OSSUploadObjectOperator,
)
from airflow.providers.alibaba.cloud.sensors.oss_key import OSSKeySensor
from tests.providers.alibaba.cloud.utils.test_utils import skip_test_if_no_valid_conn_id

TEST_CONN_ID = os.environ.get('TEST_OSS_CONN_ID', 'oss_default')
TEST_REGION = os.environ.get('TEST_OSS_REGION', 'us-east-1')
TEST_BUCKET = os.environ.get('TEST_OSS_BUCKET', 'test-bucket')
TEST_FILE_PATH = '/tmp/airflow-test'
OSS_SENSOR_STRING = 'airflow.providers.alibaba.cloud.sensors.oss_key.{}'
MOCK_TASK_ID = "test-oss-operator"
MOCK_REGION = "mock_region"
MOCK_BUCKET = "mock_bucket_name"
MOCK_OSS_CONN_ID = "mock_oss_conn_default"
MOCK_KEY = "mock_key"
MOCK_KEYS = ["mock_key1", "mock_key_2", "mock_key3"]
MOCK_CONTENT = "mock_content"


class TestOSSSensor(unittest.TestCase):
class TestOSSKeySensor(unittest.TestCase):
def setUp(self):
self.sensor = OSSKeySensor(
bucket_key='obj',
oss_conn_id=TEST_CONN_ID,
region=TEST_REGION,
bucket_name=TEST_BUCKET,
task_id='task-1',
bucket_key=MOCK_KEY,
oss_conn_id=MOCK_OSS_CONN_ID,
region=MOCK_REGION,
bucket_name=MOCK_BUCKET,
task_id=MOCK_TASK_ID,
)
try:
self.hook = OSSHook(region=TEST_REGION, oss_conn_id=TEST_CONN_ID)
self.hook.object_exists(key='test-obj', bucket_name=TEST_BUCKET)
except AirflowException:
self.hook = None
except oss2.exceptions.ServerError as e:
if e.status == 403:
self.hook = None

@skip_test_if_no_valid_conn_id
def test_init(self):
assert self.sensor.oss_conn_id == TEST_CONN_ID
@mock.patch(OSS_SENSOR_STRING.format("OSSHook"))
def test_get_hook(self, mock_service):
self.sensor.get_hook()
mock_service.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)

@skip_test_if_no_valid_conn_id
def test_poke(self):
create_bucket_operator = OSSCreateBucketOperator(
oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-2'
)
create_bucket_operator.execute({})
@mock.patch(OSS_SENSOR_STRING.format("OSSKeySensor.get_hook"), new_callable=PropertyMock)
def test_poke_exsiting_key(self, mock_service):
# Given
mock_service.return_value.object_exists.return_value = True

upload_file = f'{TEST_FILE_PATH}_upload_1'
if not os.path.exists(upload_file):
with open(upload_file, 'w') as f:
f.write('test')
upload_object_operator = OSSUploadObjectOperator(
key='obj',
file=upload_file,
oss_conn_id=TEST_CONN_ID,
region=TEST_REGION,
bucket_name=TEST_BUCKET,
task_id='task-3',
)
upload_object_operator.execute({})
assert self.sensor.poke({})
# When
res = self.sensor.poke(None)

delete_object_operator = OSSDeleteObjectOperator(
key='obj', oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-4'
)
delete_object_operator.execute({})
# Then
assert res is True
mock_service.return_value.object_exists.assert_called_once_with(key=MOCK_KEY, bucket_name=MOCK_BUCKET)

delete_bucket_operator = OSSDeleteBucketOperator(
oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-5'
)
delete_bucket_operator.execute({})
@mock.patch(OSS_SENSOR_STRING.format("OSSKeySensor.get_hook"), new_callable=PropertyMock)
def test_poke_non_exsiting_key(self, mock_service):
# Given
mock_service.return_value.object_exists.return_value = False

# When
res = self.sensor.poke(None)

# Then
assert res is False
mock_service.return_value.object_exists.assert_called_once_with(key=MOCK_KEY, bucket_name=MOCK_BUCKET)