From 1bdb9770e6d92bae4664e2f02b396a3486cf4429 Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Wed, 17 Apr 2019 12:41:56 -0500 Subject: [PATCH 01/12] Remove defensive TypeError catch in test --- tests/gcp/operators/test_dataproc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/gcp/operators/test_dataproc.py b/tests/gcp/operators/test_dataproc.py index 8a7f92bb868cd..b989c56f90469 100644 --- a/tests/gcp/operators/test_dataproc.py +++ b/tests/gcp/operators/test_dataproc.py @@ -699,7 +699,7 @@ def test_delete_cluster(self): projectId=GCP_PROJECT_ID, clusterName=CLUSTER_NAME, requestId=mock.ANY) - hook.wait.assert_called_once_with(self.operation) + hook.wait.assert_called_with(self.operation) def test_render_template(self): task = DataprocClusterDeleteOperator( From f984939454887ab7d6f87397a3dc01a0cd57ff1b Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Mon, 20 May 2019 16:54:38 -0500 Subject: [PATCH 02/12] Test fixes --- tests/gcp/operators/test_dataproc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/gcp/operators/test_dataproc.py b/tests/gcp/operators/test_dataproc.py index b989c56f90469..9e2cd488a667e 100644 --- a/tests/gcp/operators/test_dataproc.py +++ b/tests/gcp/operators/test_dataproc.py @@ -399,7 +399,8 @@ def test_create_cluster(self): self.operation = {'name': 'operation', 'done': True} self.mock_execute = Mock() self.mock_execute.execute.return_value = self.operation - self.mock_clusters = Mock() + # Clusters should be iterable + self.mock_clusters = [Mock()] self.mock_clusters.create.return_value = self.mock_execute self.mock_regions = Mock() self.mock_regions.clusters.return_value = self.mock_clusters From 56726ecef4581d02ab4c3b7e6d850a6023e7a51d Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Tue, 21 May 2019 10:40:50 -0500 Subject: [PATCH 03/12] More test fix --- tests/gcp/operators/test_dataproc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/gcp/operators/test_dataproc.py b/tests/gcp/operators/test_dataproc.py index 9e2cd488a667e..28daa219eda32 100644 --- a/tests/gcp/operators/test_dataproc.py +++ b/tests/gcp/operators/test_dataproc.py @@ -401,7 +401,8 @@ def test_create_cluster(self): self.mock_execute.execute.return_value = self.operation # Clusters should be iterable self.mock_clusters = [Mock()] - self.mock_clusters.create.return_value = self.mock_execute + for m in self.mock_clusters: + m.create.return_value = self.mock_execute self.mock_regions = Mock() self.mock_regions.clusters.return_value = self.mock_clusters self.mock_projects = Mock() From edf070c7b91275ecf14f1f2754bd7729031f15e1 Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Tue, 21 May 2019 12:04:45 -0500 Subject: [PATCH 04/12] Different test mock approach --- tests/gcp/operators/test_dataproc.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/gcp/operators/test_dataproc.py b/tests/gcp/operators/test_dataproc.py index 28daa219eda32..91ef6b8fd4ff7 100644 --- a/tests/gcp/operators/test_dataproc.py +++ b/tests/gcp/operators/test_dataproc.py @@ -399,10 +399,12 @@ def test_create_cluster(self): self.operation = {'name': 'operation', 'done': True} self.mock_execute = Mock() self.mock_execute.execute.return_value = self.operation - # Clusters should be iterable - self.mock_clusters = [Mock()] - for m in self.mock_clusters: - m.create.return_value = self.mock_execute + self.mock_list = Mock() + self.mock_list_execute = {} + self.mock_list.execute.return_value = self.mock_list_execute + self.mock_clusters = Mock() + self.mock_clusters.create.return_value = self.mock_execute + self.mock_clusters.list.return_value = self.mock_list self.mock_regions = Mock() self.mock_regions.clusters.return_value = self.mock_clusters self.mock_projects = Mock() From 117235669efb8556c50bcbe2549b7dfc4e6b27ba Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Tue, 21 May 2019 15:20:24 -0500 Subject: [PATCH 05/12] Add requested test --- tests/gcp/operators/test_dataproc.py | 37 ++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/gcp/operators/test_dataproc.py b/tests/gcp/operators/test_dataproc.py index 91ef6b8fd4ff7..9234ea6d3ecb2 100644 --- a/tests/gcp/operators/test_dataproc.py +++ b/tests/gcp/operators/test_dataproc.py @@ -527,6 +527,43 @@ def test_create_cluster_with_multiple_masters(self): 'labels': {'airflow-version': mock.ANY}}) hook.wait.assert_called_once_with(self.operation) + def test_create_cluster_deletes_error_cluster(self): + # Setup service.projects().regions().clusters().create() + # .execute() + self.operation = {'name': 'operation', 'done': True} + self.mock_execute = Mock() + self.mock_execute.execute.return_value = self.operation + self.mock_list = Mock() + self.mock_list_execute = {'clusters':[{'clusterName': CLUSTER_NAME,'status': {'state':'ERROR'}}]} + self.mock_list.execute.return_value = self.mock_list_execute + self.mock_clusters = Mock() + self.mock_clusters.create.return_value = self.mock_execute + self.mock_clusters.list.return_value = self.mock_list + self.mock_regions = Mock() + self.mock_regions.clusters.return_value = self.mock_clusters + self.mock_projects = Mock() + self.mock_projects.regions.return_value = self.mock_regions + self.mock_conn = Mock() + self.mock_conn.projects.return_value = self.mock_projects + + with patch(HOOK) as MockHook: + hook = MockHook() + hook.get_conn.return_value = self.mock_conn + hook.wait.return_value = None + + dataproc_task = DataprocClusterCreateOperator( + task_id=TASK_ID, + region=GCP_REGION, + cluster_name=CLUSTER_NAME, + project_id=GCP_PROJECT_ID, + num_workers=NUM_WORKERS, + zone=GCE_ZONE, + dag=self.dag + ) + with patch.object(dataproc_task.log, 'info') as mock_info: + dataproc_task.execute(None) + mock_info.assert_any_call('Existing cluster in ERROR state, deleting it first') + def test_build_cluster_data_internal_ip_only_without_subnetwork(self): def create_cluster_with_invalid_internal_ip_only_setup(): From 6ea5bec2fcfabf614f8754e76f4f45c4d090cac2 Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Tue, 21 May 2019 16:10:01 -0500 Subject: [PATCH 06/12] Fix whitespacing --- tests/gcp/operators/test_dataproc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/gcp/operators/test_dataproc.py b/tests/gcp/operators/test_dataproc.py index 9234ea6d3ecb2..54a1b389bf809 100644 --- a/tests/gcp/operators/test_dataproc.py +++ b/tests/gcp/operators/test_dataproc.py @@ -534,7 +534,7 @@ def test_create_cluster_deletes_error_cluster(self): self.mock_execute = Mock() self.mock_execute.execute.return_value = self.operation self.mock_list = Mock() - self.mock_list_execute = {'clusters':[{'clusterName': CLUSTER_NAME,'status': {'state':'ERROR'}}]} + self.mock_list_execute = {'clusters': [{'clusterName': CLUSTER_NAME, 'status': {'state': 'ERROR'}}]} self.mock_list.execute.return_value = self.mock_list_execute self.mock_clusters = Mock() self.mock_clusters.create.return_value = self.mock_execute From ed0e24bccd16c6ff1947bec24ccfc20a03a11108 Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Fri, 16 Aug 2019 12:37:52 -0500 Subject: [PATCH 07/12] Refactor several methods per PR feedback --- tests/gcp/operators/test_dataproc.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/gcp/operators/test_dataproc.py b/tests/gcp/operators/test_dataproc.py index 54a1b389bf809..ba8ca34a9b74c 100644 --- a/tests/gcp/operators/test_dataproc.py +++ b/tests/gcp/operators/test_dataproc.py @@ -550,6 +550,7 @@ def test_create_cluster_deletes_error_cluster(self): hook = MockHook() hook.get_conn.return_value = self.mock_conn hook.wait.return_value = None + hook._get_final_cluster_state.return_value = "ERROR" dataproc_task = DataprocClusterCreateOperator( task_id=TASK_ID, From 1fc8fa5f42724b53533e2f05c477ef37933f0347 Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Fri, 13 Sep 2019 11:06:26 -0500 Subject: [PATCH 08/12] rebase --- airflow/gcp/hooks/dataproc.py | 77 +++++++++++++++++++++++++++++++ airflow/gcp/operators/dataproc.py | 72 ++++++++++++++++++++++++++++- 2 files changed, 147 insertions(+), 2 deletions(-) diff --git a/airflow/gcp/hooks/dataproc.py b/airflow/gcp/hooks/dataproc.py index cf846e1545af7..c620a212b3388 100644 --- a/airflow/gcp/hooks/dataproc.py +++ b/airflow/gcp/hooks/dataproc.py @@ -565,6 +565,83 @@ def cancel(self, project_id: str, job_id: str, region: str = 'global') -> Dict: jobId=job_id ) + def _get_final_cluster_state(self, project_id, region, cluster_name, logger): + while True: + state = DataProcHook._get_cluster_state(self.get_conn(), project_id, region, cluster_name) + if state is None: + logger.info("No state for cluster '%s'", cluster_name) + time.sleep(15) + else: + logger.info("State for cluster '%s' is %s", cluster_name, state) + return state + + @staticmethod + def _get_cluster_state(service, project_id, region, cluster_name): + cluster = DataProcHook._get_cluster(service, project_id, region, cluster_name) + if cluster and 'status' in cluster: + return cluster['status']['state'] + else: + return None + + @staticmethod + def _get_cluster(service, project_id, region, cluster_name): + cluster_list = DataProcHook._get_cluster_list_for_project(service, project_id, region) + cluster = [c for c in cluster_list if c['clusterName'] == cluster_name] + if cluster: + return cluster[0] + return None + + @staticmethod + def _get_cluster_list_for_project(service, project_id, region): + result = service.projects().regions().clusters().list( + projectId=project_id, + region=region + ).execute() + return result.get('clusters', []) + + @staticmethod + def _execute_dataproc_diagnose(service, project_id, region, cluster_name): + response = service.projects().regions().clusters().diagnose( + projectId=project_id, + region=region, + clusterName=cluster_name, + body={} + ).execute() + operation_name = response['name'] + return operation_name + + @staticmethod + def _execute_delete(service, project_id, region, cluster_name): + response = service.projects().regions().clusters().delete( + projectId=project_id, + region=region, + clusterName=cluster_name + ).execute(num_retries=5) + operation_name = response['name'] + return operation_name + + @staticmethod + # Return the response object when done + def _wait_for_operation_done(service, operation_name): + while True: + response = service.projects().regions().operations().get( + name=operation_name + ).execute(num_retries=5) + + if response.get('done'): + return response + time.sleep(15) + + @staticmethod + def _wait_for_operation_done_or_error(service, operation_name): + + response = DataProcHook._wait_for_operation_done(service, operation_name) + if response.get('done'): + if 'error' in response: + raise AirflowException(str(response['error'])) + else: + return + setattr( DataProcHook, diff --git a/airflow/gcp/operators/dataproc.py b/airflow/gcp/operators/dataproc.py index bcbcfe48e0393..8473203eef43c 100644 --- a/airflow/gcp/operators/dataproc.py +++ b/airflow/gcp/operators/dataproc.py @@ -276,8 +276,36 @@ def __init__(self, ) ), "num_workers == 0 means single node mode - no preemptibles allowed" + def _cluster_ready(self, state, service): + if state == 'RUNNING': + return True + if state == 'DELETING': + raise Exception('Tried to create a cluster but it\'s in DELETING, something went wrong.') + if state == 'ERROR': + cluster = DataProcHook._get_cluster(service, self.project_id, self.region, self.cluster_name) + try: + error_details = cluster['status']['details'] + except KeyError: + error_details = 'Unknown error in cluster creation, ' \ + 'check Google Cloud console for details.' + + self.log.info('Dataproc cluster creation resulted in an ERROR state running diagnostics') + self.log.info(error_details) + diagnose_operation_name = \ + DataProcHook._execute_dataproc_diagnose(service, self.project_id, + self.region, self.cluster_name) + diagnose_result = DataProcHook._wait_for_operation_done(service, diagnose_operation_name) + if diagnose_result.get('response') and diagnose_result.get('response').get('outputUri'): + outputUri = diagnose_result.get('response').get('outputUri') + self.log.info('Diagnostic information for ERROR cluster available at [%s]', outputUri) + else: + self.log.info('Diagnostic information could not be retrieved!') + + raise Exception(error_details) + return False + def _get_init_action_timeout(self): - match = re.match(r"^(\d+)(s|m)$", self.init_action_timeout) + match = re.match(r"^(\d+)([sm])$", self.init_action_timeout) if match: if match.group(2) == "s": return self.init_action_timeout @@ -445,11 +473,51 @@ def _build_cluster_data(self): return cluster_data + def _usable_existing_cluster_present(self, service): + existing_cluster = DataProcHook._get_cluster(service, self.project_id, self.region, self.cluster_name) + if existing_cluster: + self.log.info( + 'Cluster %s already exists... Checking status...', + self.cluster_name + ) + existing_status = self.hook._get_final_cluster_state(service, self.project_id, + self.region, self.cluster_name, self.log) + + if existing_status == 'RUNNING': + self.log.info('Cluster exists and is already running. Using it.') + return True + + elif existing_status == 'DELETING': + while DataProcHook._get_cluster(service, self.project_id, self.region, self.cluster_name) \ + and DataProcHook._get_cluster_state(service, self.project_id, + self.region, self.cluster_name) == 'DELETING': + self.log.info('Existing cluster is deleting, waiting for it to finish') + time.sleep(15) + + elif existing_status == 'ERROR': + self.log.info('Existing cluster in ERROR state, deleting it first') + + operation_name = DataProcHook._execute_delete(service, self.project_id, + self.region, self.cluster_name) + self.log.info("Cluster delete operation name: %s", operation_name) + DataProcHook._wait_for_operation_done_or_error(service, operation_name) + + return False + def start(self): """ Create a new cluster on Google Cloud Dataproc. """ self.log.info('Creating cluster: %s', self.cluster_name) + hook = DataProcHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to + ) + service = hook.get_conn() + + if self._usable_existing_cluster_present(service): + return True + cluster_data = self._build_cluster_data() return ( @@ -542,7 +610,7 @@ def _build_scale_cluster_data(self): @staticmethod def _get_graceful_decommission_timeout(timeout): - match = re.match(r"^(\d+)(s|m|h|d)$", timeout) + match = re.match(r"^(\d+)([smdh])$", timeout) if match: if match.group(2) == "s": return timeout From 35d48c5e8a00e0201322a65e2ba9e1af9ecd8110 Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Fri, 13 Sep 2019 12:15:02 -0500 Subject: [PATCH 09/12] fix linter and patch paths --- airflow/gcp/operators/dataproc.py | 2 +- tests/gcp/operators/test_dataproc.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow/gcp/operators/dataproc.py b/airflow/gcp/operators/dataproc.py index 8473203eef43c..e249be820937e 100644 --- a/airflow/gcp/operators/dataproc.py +++ b/airflow/gcp/operators/dataproc.py @@ -480,7 +480,7 @@ def _usable_existing_cluster_present(self, service): 'Cluster %s already exists... Checking status...', self.cluster_name ) - existing_status = self.hook._get_final_cluster_state(service, self.project_id, + existing_status = self.hook._get_final_cluster_state(self.project_id, self.region, self.cluster_name, self.log) if existing_status == 'RUNNING': diff --git a/tests/gcp/operators/test_dataproc.py b/tests/gcp/operators/test_dataproc.py index ba8ca34a9b74c..99f4823bcc333 100644 --- a/tests/gcp/operators/test_dataproc.py +++ b/tests/gcp/operators/test_dataproc.py @@ -834,7 +834,7 @@ def setUp(self): schedule_interval='@daily') @mock.patch( - 'airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook.project_id', + 'airflow.gcp.hooks.dataproc.DataProcHook.project_id', new_callable=PropertyMock, return_value=GCP_PROJECT_ID ) @@ -919,7 +919,7 @@ def setUp(self): schedule_interval='@daily') @mock.patch( - 'airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook.project_id', + 'airflow.gcp.hooks.dataproc.DataProcHook.project_id', new_callable=PropertyMock, return_value=GCP_PROJECT_ID ) @@ -1004,7 +1004,7 @@ def setUp(self): schedule_interval='@daily') @mock.patch( - 'airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook.project_id', + 'airflow.gcp.hooks.dataproc.DataProcHook.project_id', new_callable=PropertyMock, return_value=GCP_PROJECT_ID ) @@ -1094,7 +1094,7 @@ def setUp(self): schedule_interval='@daily') @mock.patch( - 'airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook.project_id', + 'airflow.gcp.hooks.dataproc.DataProcHook.project_id', new_callable=PropertyMock, return_value=GCP_PROJECT_ID ) @@ -1182,7 +1182,7 @@ def setUp(self): schedule_interval='@daily') @mock.patch( - 'airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook.project_id', + 'airflow.gcp.hooks.dataproc.DataProcHook.project_id', new_callable=PropertyMock, return_value=GCP_PROJECT_ID ) From 1c2548e7358f4771d7bfd22fa8c1d0f2476841a2 Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Fri, 13 Sep 2019 13:47:58 -0500 Subject: [PATCH 10/12] More pylin --- airflow/gcp/hooks/dataproc.py | 22 +++++++++++----------- airflow/gcp/operators/dataproc.py | 26 +++++++++++++------------- tests/gcp/operators/test_dataproc.py | 4 ++-- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/airflow/gcp/hooks/dataproc.py b/airflow/gcp/hooks/dataproc.py index c620a212b3388..7fe26ef84f8ef 100644 --- a/airflow/gcp/hooks/dataproc.py +++ b/airflow/gcp/hooks/dataproc.py @@ -567,7 +567,7 @@ def cancel(self, project_id: str, job_id: str, region: str = 'global') -> Dict: def _get_final_cluster_state(self, project_id, region, cluster_name, logger): while True: - state = DataProcHook._get_cluster_state(self.get_conn(), project_id, region, cluster_name) + state = DataProcHook.get_cluster_state(self.get_conn(), project_id, region, cluster_name) if state is None: logger.info("No state for cluster '%s'", cluster_name) time.sleep(15) @@ -576,23 +576,23 @@ def _get_final_cluster_state(self, project_id, region, cluster_name, logger): return state @staticmethod - def _get_cluster_state(service, project_id, region, cluster_name): - cluster = DataProcHook._get_cluster(service, project_id, region, cluster_name) + def get_cluster_state(service, project_id, region, cluster_name): + cluster = DataProcHook.find_cluster(service, project_id, region, cluster_name) if cluster and 'status' in cluster: return cluster['status']['state'] else: return None @staticmethod - def _get_cluster(service, project_id, region, cluster_name): - cluster_list = DataProcHook._get_cluster_list_for_project(service, project_id, region) + def find_cluster(service, project_id, region, cluster_name): + cluster_list = DataProcHook.get_cluster_list_for_project(service, project_id, region) cluster = [c for c in cluster_list if c['clusterName'] == cluster_name] if cluster: return cluster[0] return None @staticmethod - def _get_cluster_list_for_project(service, project_id, region): + def get_cluster_list_for_project(service, project_id, region): result = service.projects().regions().clusters().list( projectId=project_id, region=region @@ -600,7 +600,7 @@ def _get_cluster_list_for_project(service, project_id, region): return result.get('clusters', []) @staticmethod - def _execute_dataproc_diagnose(service, project_id, region, cluster_name): + def execute_dataproc_diagnose(service, project_id, region, cluster_name): response = service.projects().regions().clusters().diagnose( projectId=project_id, region=region, @@ -611,7 +611,7 @@ def _execute_dataproc_diagnose(service, project_id, region, cluster_name): return operation_name @staticmethod - def _execute_delete(service, project_id, region, cluster_name): + def execute_delete(service, project_id, region, cluster_name): response = service.projects().regions().clusters().delete( projectId=project_id, region=region, @@ -622,7 +622,7 @@ def _execute_delete(service, project_id, region, cluster_name): @staticmethod # Return the response object when done - def _wait_for_operation_done(service, operation_name): + def wait_for_operation_done(service, operation_name): while True: response = service.projects().regions().operations().get( name=operation_name @@ -633,9 +633,9 @@ def _wait_for_operation_done(service, operation_name): time.sleep(15) @staticmethod - def _wait_for_operation_done_or_error(service, operation_name): + def wait_for_operation_done_or_error(service, operation_name): - response = DataProcHook._wait_for_operation_done(service, operation_name) + response = DataProcHook.wait_for_operation_done(service, operation_name) if response.get('done'): if 'error' in response: raise AirflowException(str(response['error'])) diff --git a/airflow/gcp/operators/dataproc.py b/airflow/gcp/operators/dataproc.py index e249be820937e..e2d498832ac0b 100644 --- a/airflow/gcp/operators/dataproc.py +++ b/airflow/gcp/operators/dataproc.py @@ -282,7 +282,7 @@ def _cluster_ready(self, state, service): if state == 'DELETING': raise Exception('Tried to create a cluster but it\'s in DELETING, something went wrong.') if state == 'ERROR': - cluster = DataProcHook._get_cluster(service, self.project_id, self.region, self.cluster_name) + cluster = DataProcHook.find_cluster(service, self.project_id, self.region, self.cluster_name) try: error_details = cluster['status']['details'] except KeyError: @@ -292,12 +292,12 @@ def _cluster_ready(self, state, service): self.log.info('Dataproc cluster creation resulted in an ERROR state running diagnostics') self.log.info(error_details) diagnose_operation_name = \ - DataProcHook._execute_dataproc_diagnose(service, self.project_id, - self.region, self.cluster_name) - diagnose_result = DataProcHook._wait_for_operation_done(service, diagnose_operation_name) + DataProcHook.execute_dataproc_diagnose(service, self.project_id, + self.region, self.cluster_name) + diagnose_result = DataProcHook.wait_for_operation_done(service, diagnose_operation_name) if diagnose_result.get('response') and diagnose_result.get('response').get('outputUri'): - outputUri = diagnose_result.get('response').get('outputUri') - self.log.info('Diagnostic information for ERROR cluster available at [%s]', outputUri) + output_uri = diagnose_result.get('response').get('outputUri') + self.log.info('Diagnostic information for ERROR cluster available at [%s]', output_uri) else: self.log.info('Diagnostic information could not be retrieved!') @@ -474,7 +474,7 @@ def _build_cluster_data(self): return cluster_data def _usable_existing_cluster_present(self, service): - existing_cluster = DataProcHook._get_cluster(service, self.project_id, self.region, self.cluster_name) + existing_cluster = DataProcHook.find_cluster(service, self.project_id, self.region, self.cluster_name) if existing_cluster: self.log.info( 'Cluster %s already exists... Checking status...', @@ -488,19 +488,19 @@ def _usable_existing_cluster_present(self, service): return True elif existing_status == 'DELETING': - while DataProcHook._get_cluster(service, self.project_id, self.region, self.cluster_name) \ - and DataProcHook._get_cluster_state(service, self.project_id, - self.region, self.cluster_name) == 'DELETING': + while DataProcHook.find_cluster(service, self.project_id, self.region, self.cluster_name) \ + and DataProcHook.get_cluster_state(service, self.project_id, + self.region, self.cluster_name) == 'DELETING': self.log.info('Existing cluster is deleting, waiting for it to finish') time.sleep(15) elif existing_status == 'ERROR': self.log.info('Existing cluster in ERROR state, deleting it first') - operation_name = DataProcHook._execute_delete(service, self.project_id, - self.region, self.cluster_name) + operation_name = DataProcHook.execute_delete(service, self.project_id, + self.region, self.cluster_name) self.log.info("Cluster delete operation name: %s", operation_name) - DataProcHook._wait_for_operation_done_or_error(service, operation_name) + DataProcHook.wait_for_operation_done_or_error(service, operation_name) return False diff --git a/tests/gcp/operators/test_dataproc.py b/tests/gcp/operators/test_dataproc.py index 99f4823bcc333..1c1156fa3e534 100644 --- a/tests/gcp/operators/test_dataproc.py +++ b/tests/gcp/operators/test_dataproc.py @@ -546,8 +546,8 @@ def test_create_cluster_deletes_error_cluster(self): self.mock_conn = Mock() self.mock_conn.projects.return_value = self.mock_projects - with patch(HOOK) as MockHook: - hook = MockHook() + with patch(HOOK) as mock_hook: + hook = mock_hook() hook.get_conn.return_value = self.mock_conn hook.wait.return_value = None hook._get_final_cluster_state.return_value = "ERROR" From 3c94d0daa7b7185dd7523b09192a2ba8618a5fde Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Fri, 13 Sep 2019 15:00:15 -0500 Subject: [PATCH 11/12] even more pylint --- airflow/gcp/hooks/dataproc.py | 2 +- airflow/gcp/operators/dataproc.py | 4 ++-- tests/gcp/operators/test_dataproc.py | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow/gcp/hooks/dataproc.py b/airflow/gcp/hooks/dataproc.py index 7fe26ef84f8ef..b82b5df0afdc4 100644 --- a/airflow/gcp/hooks/dataproc.py +++ b/airflow/gcp/hooks/dataproc.py @@ -565,7 +565,7 @@ def cancel(self, project_id: str, job_id: str, region: str = 'global') -> Dict: jobId=job_id ) - def _get_final_cluster_state(self, project_id, region, cluster_name, logger): + def get_final_cluster_state(self, project_id, region, cluster_name, logger): while True: state = DataProcHook.get_cluster_state(self.get_conn(), project_id, region, cluster_name) if state is None: diff --git a/airflow/gcp/operators/dataproc.py b/airflow/gcp/operators/dataproc.py index e2d498832ac0b..4910ccaa33abf 100644 --- a/airflow/gcp/operators/dataproc.py +++ b/airflow/gcp/operators/dataproc.py @@ -480,8 +480,8 @@ def _usable_existing_cluster_present(self, service): 'Cluster %s already exists... Checking status...', self.cluster_name ) - existing_status = self.hook._get_final_cluster_state(self.project_id, - self.region, self.cluster_name, self.log) + existing_status = self.hook.get_final_cluster_state(self.project_id, + self.region, self.cluster_name, self.log) if existing_status == 'RUNNING': self.log.info('Cluster exists and is already running. Using it.') diff --git a/tests/gcp/operators/test_dataproc.py b/tests/gcp/operators/test_dataproc.py index 1c1156fa3e534..cdddbc77ffd29 100644 --- a/tests/gcp/operators/test_dataproc.py +++ b/tests/gcp/operators/test_dataproc.py @@ -530,6 +530,7 @@ def test_create_cluster_with_multiple_masters(self): def test_create_cluster_deletes_error_cluster(self): # Setup service.projects().regions().clusters().create() # .execute() + # pylint:disable=attribute-defined-outside-init self.operation = {'name': 'operation', 'done': True} self.mock_execute = Mock() self.mock_execute.execute.return_value = self.operation @@ -550,7 +551,7 @@ def test_create_cluster_deletes_error_cluster(self): hook = mock_hook() hook.get_conn.return_value = self.mock_conn hook.wait.return_value = None - hook._get_final_cluster_state.return_value = "ERROR" + hook.get_final_cluster_state.return_value = "ERROR" dataproc_task = DataprocClusterCreateOperator( task_id=TASK_ID, From 77fb07ae25dbc3af8e45fcb552f2ba33d4042cc2 Mon Sep 17 00:00:00 2001 From: aniskodedossett Date: Mon, 16 Sep 2019 09:23:23 -0400 Subject: [PATCH 12/12] Yet more pylint --- airflow/gcp/hooks/dataproc.py | 64 +++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/airflow/gcp/hooks/dataproc.py b/airflow/gcp/hooks/dataproc.py index b82b5df0afdc4..75e4706243f6d 100644 --- a/airflow/gcp/hooks/dataproc.py +++ b/airflow/gcp/hooks/dataproc.py @@ -566,6 +566,15 @@ def cancel(self, project_id: str, job_id: str, region: str = 'global') -> Dict: ) def get_final_cluster_state(self, project_id, region, cluster_name, logger): + """ + Poll for the state of a cluster until one is available + + :param project_id: + :param region: + :param cluster_name: + :param logger: + :return: + """ while True: state = DataProcHook.get_cluster_state(self.get_conn(), project_id, region, cluster_name) if state is None: @@ -577,6 +586,14 @@ def get_final_cluster_state(self, project_id, region, cluster_name, logger): @staticmethod def get_cluster_state(service, project_id, region, cluster_name): + """ + Get the state of a cluster if it has one, otherwise None + :param service: + :param project_id: + :param region: + :param cluster_name: + :return: + """ cluster = DataProcHook.find_cluster(service, project_id, region, cluster_name) if cluster and 'status' in cluster: return cluster['status']['state'] @@ -585,6 +602,14 @@ def get_cluster_state(service, project_id, region, cluster_name): @staticmethod def find_cluster(service, project_id, region, cluster_name): + """ + Retrieve a cluster from the project/region if it exists, otherwise None + :param service: + :param project_id: + :param region: + :param cluster_name: + :return: + """ cluster_list = DataProcHook.get_cluster_list_for_project(service, project_id, region) cluster = [c for c in cluster_list if c['clusterName'] == cluster_name] if cluster: @@ -593,6 +618,13 @@ def find_cluster(service, project_id, region, cluster_name): @staticmethod def get_cluster_list_for_project(service, project_id, region): + """ + List all clusters for a given project/region, an empty list if none exist + :param service: + :param project_id: + :param region: + :return: + """ result = service.projects().regions().clusters().list( projectId=project_id, region=region @@ -601,6 +633,15 @@ def get_cluster_list_for_project(service, project_id, region): @staticmethod def execute_dataproc_diagnose(service, project_id, region, cluster_name): + """ + Execute the diagonse command against a given cluster, useful to get debugging + information if something has gone wrong or cluster creation failed. + :param service: + :param project_id: + :param region: + :param cluster_name: + :return: + """ response = service.projects().regions().clusters().diagnose( projectId=project_id, region=region, @@ -612,6 +653,14 @@ def execute_dataproc_diagnose(service, project_id, region, cluster_name): @staticmethod def execute_delete(service, project_id, region, cluster_name): + """ + Delete a specified cluster + :param service: + :param project_id: + :param region: + :param cluster_name: + :return: The identifier of the operation being executed + """ response = service.projects().regions().clusters().delete( projectId=project_id, region=region, @@ -621,8 +670,13 @@ def execute_delete(service, project_id, region, cluster_name): return operation_name @staticmethod - # Return the response object when done def wait_for_operation_done(service, operation_name): + """ + Poll for the completion of a specific GCP operation + :param service: + :param operation_name: + :return: The response code of the completed operation + """ while True: response = service.projects().regions().operations().get( name=operation_name @@ -634,7 +688,13 @@ def wait_for_operation_done(service, operation_name): @staticmethod def wait_for_operation_done_or_error(service, operation_name): - + """ + Block until the specified operation is done. Throws an AirflowException if + the operation completed but had an error + :param service: + :param operation_name: + :return: + """ response = DataProcHook.wait_for_operation_done(service, operation_name) if response.get('done'): if 'error' in response: