From f7403ed931055c5c7dc1d1cbcdd7129961783914 Mon Sep 17 00:00:00 2001 From: andoni-guzman Date: Mon, 2 May 2022 18:32:41 -0500 Subject: [PATCH 01/13] Added console web url in case of dataflow failed job --- .../apache_beam/runners/dataflow/dataflow_runner.py | 12 +++++++++--- .../runners/dataflow/test_dataflow_runner.py | 5 +++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 53cc48b335cb..85c2b74be7cd 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1608,6 +1608,9 @@ def is_in_terminal_state(self): return PipelineState.is_terminal(self._get_job_state()) def wait_until_finish(self, duration=None): + consoleUrl = """More info in:'https://console.cloud.google.com/ + dataflow/jobs//{}?project='""".format(self.job_id) + if not self.is_in_terminal_state(): if not self.has_job: raise IOError('Failed to get the Dataflow job id.') @@ -1628,14 +1631,17 @@ def wait_until_finish(self, duration=None): # is_in_terminal_state. terminated = self.is_in_terminal_state() assert duration or terminated, ( - 'Job did not reach to a terminal state after waiting indefinitely.') + 'Job did not reach to a terminal state after waiting indefinitely. %s'% + (consoleUrl)) if terminated and self.state != PipelineState.DONE: # TODO(BEAM-1290): Consider converting this to an error log based on # theresolution of the issue. + _LOGGER.error(consoleUrl) raise DataflowRuntimeException( - 'Dataflow pipeline failed. State: %s, Error:\n%s' % - (self.state, getattr(self._runner, 'last_error_msg', None)), + 'Dataflow pipeline failed. State: %s, Error:\n%s\n%s' % + (self.state, getattr(self._runner, 'last_error_msg', None), + consoleUrl), self) return self.state diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index d4743a558f3e..35e93014e0fb 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -39,6 +39,7 @@ class TestDataflowRunner(DataflowRunner): + console_url = "https://console.cloud.google.com/dataflow/jobs/" def run_pipeline(self, pipeline, options): """Execute test pipeline and verify test matcher""" test_options = options.view_as(TestOptions) @@ -55,6 +56,8 @@ def run_pipeline(self, pipeline, options): # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. print('Worker logs: %s' % self.build_console_url(options)) + self.console_url = self.build_console_url(options) + _LOGGER.info('Console log: {}'.format(self.console_url)) try: self.wait_until_in_state(PipelineState.RUNNING) @@ -85,6 +88,7 @@ def build_console_url(self, options): def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): """Wait until Dataflow pipeline enters a certain state.""" if not self.result.has_job: + _LOGGER.error("Console log: {}".format(self.console_url)) raise IOError('Failed to get the Dataflow job id.') start_time = time.time() @@ -94,6 +98,7 @@ def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): return job_state time.sleep(5) + _LOGGER.error("Console log: {}".format(self.console_url)) raise RuntimeError( 'Timeout after %d seconds while waiting for job %s ' 'enters expected state %s. Current state is %s.' % From 67efc9716f7dbef51b2d2c7266c43cc92c3e8d50 Mon Sep 17 00:00:00 2001 From: andoni-guzman Date: Tue, 3 May 2022 13:33:33 -0500 Subject: [PATCH 02/13] Added error in dataflow exception --- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 85c2b74be7cd..22b776e039fc 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1639,9 +1639,9 @@ def wait_until_finish(self, duration=None): # theresolution of the issue. _LOGGER.error(consoleUrl) raise DataflowRuntimeException( - 'Dataflow pipeline failed. State: %s, Error:\n%s\n%s' % - (self.state, getattr(self._runner, 'last_error_msg', None), - consoleUrl), + '%s\nDataflow pipeline failed. State: %s, Error:\n%s' % + (consoleUrl, self.state, + getattr(self._runner, 'last_error_msg', None)), self) return self.state From 513cda5a5a432d543a671bcb775adc947ec82ce9 Mon Sep 17 00:00:00 2001 From: andoni-guzman Date: Tue, 3 May 2022 16:11:46 -0500 Subject: [PATCH 03/13] Python format --- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 22b776e039fc..90770edb2bcd 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1641,7 +1641,7 @@ def wait_until_finish(self, duration=None): raise DataflowRuntimeException( '%s\nDataflow pipeline failed. State: %s, Error:\n%s' % (consoleUrl, self.state, - getattr(self._runner, 'last_error_msg', None)), + getattr(self._runner, 'last_error_msg', None)), self) return self.state From d43def5b0a1a4c57f92a0dd37a02ca79b1cc2622 Mon Sep 17 00:00:00 2001 From: andoni-guzman Date: Tue, 3 May 2022 18:56:50 -0500 Subject: [PATCH 04/13] Python format --- .../runners/dataflow/dataflow_runner.py | 14 ++++++-------- .../runners/dataflow/test_dataflow_runner.py | 13 ++++++------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 90770edb2bcd..180d5cb2bd3c 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1609,8 +1609,7 @@ def is_in_terminal_state(self): def wait_until_finish(self, duration=None): consoleUrl = """More info in:'https://console.cloud.google.com/ - dataflow/jobs//{}?project='""".format(self.job_id) - + dataflow/jobs//{}?project='""".format(self.job_id) if not self.is_in_terminal_state(): if not self.has_job: raise IOError('Failed to get the Dataflow job id.') @@ -1631,17 +1630,16 @@ def wait_until_finish(self, duration=None): # is_in_terminal_state. terminated = self.is_in_terminal_state() assert duration or terminated, ( - 'Job did not reach to a terminal state after waiting indefinitely. %s'% - (consoleUrl)) + 'Job did not reach to a terminal state after waiting indefinitely.' + 'Console log: {}'.format(consoleUrl)) if terminated and self.state != PipelineState.DONE: # TODO(BEAM-1290): Consider converting this to an error log based on # theresolution of the issue. - _LOGGER.error(consoleUrl) + _LOGGER.error('Console URL {}'.format(consoleUrl)) raise DataflowRuntimeException( - '%s\nDataflow pipeline failed. State: %s, Error:\n%s' % - (consoleUrl, self.state, - getattr(self._runner, 'last_error_msg', None)), + 'Dataflow pipeline failed. State: %s, Error:\n%s' % + (self.state, getattr(self._runner, 'last_error_msg', None)), self) return self.state diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index 35e93014e0fb..f11672b9535f 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -39,7 +39,6 @@ class TestDataflowRunner(DataflowRunner): - console_url = "https://console.cloud.google.com/dataflow/jobs/" def run_pipeline(self, pipeline, options): """Execute test pipeline and verify test matcher""" test_options = options.view_as(TestOptions) @@ -56,8 +55,7 @@ def run_pipeline(self, pipeline, options): # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. print('Worker logs: %s' % self.build_console_url(options)) - self.console_url = self.build_console_url(options) - _LOGGER.info('Console log: {}'.format(self.console_url)) + _LOGGER.info('Console log: {}'.format(self.build_console_url(options))) try: self.wait_until_in_state(PipelineState.RUNNING) @@ -87,8 +85,10 @@ def build_console_url(self, options): def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): """Wait until Dataflow pipeline enters a certain state.""" + consoleUrl = """https://console.cloud.google.com/dataflow/jobs/ + /{}?project=""".format(self.result.job_id) if not self.result.has_job: - _LOGGER.error("Console log: {}".format(self.console_url)) + _LOGGER.error("Console log: {}".format(consoleUrl)) raise IOError('Failed to get the Dataflow job id.') start_time = time.time() @@ -97,9 +97,8 @@ def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): if self.result.is_in_terminal_state() or job_state == expected_state: return job_state time.sleep(5) - - _LOGGER.error("Console log: {}".format(self.console_url)) + _LOGGER.error('Console URL {}'.format(consoleUrl)) raise RuntimeError( 'Timeout after %d seconds while waiting for job %s ' 'enters expected state %s. Current state is %s.' % - (timeout, self.result.job_id(), expected_state, self.result.state)) + (timeout, self.result.job_id(), expected_state, self.result.state)) \ No newline at end of file From 14bc5eecb7e1c3ee0ef6bc6bdec797c70b3a3148 Mon Sep 17 00:00:00 2001 From: andoni-guzman Date: Tue, 3 May 2022 19:00:12 -0500 Subject: [PATCH 05/13] Python format --- .../apache_beam/runners/dataflow/test_dataflow_runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index f11672b9535f..c7f3adcc79b8 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -101,4 +101,5 @@ def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): raise RuntimeError( 'Timeout after %d seconds while waiting for job %s ' 'enters expected state %s. Current state is %s.' % - (timeout, self.result.job_id(), expected_state, self.result.state)) \ No newline at end of file + (timeout, self.result.job_id(), expected_state, self.result.state)) + \ No newline at end of file From 267b2d51f759c8cf7012632e1c2963b9d417307b Mon Sep 17 00:00:00 2001 From: andoni-guzman Date: Tue, 3 May 2022 20:05:59 -0500 Subject: [PATCH 06/13] Fix python pylint wrning --- .../apache_beam/runners/dataflow/dataflow_runner.py | 6 +++--- .../runners/dataflow/test_dataflow_runner.py | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 180d5cb2bd3c..df1676095c3f 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1608,7 +1608,7 @@ def is_in_terminal_state(self): return PipelineState.is_terminal(self._get_job_state()) def wait_until_finish(self, duration=None): - consoleUrl = """More info in:'https://console.cloud.google.com/ + consoleUrl = """Console URL: 'https://console.cloud.google.com/ dataflow/jobs//{}?project='""".format(self.job_id) if not self.is_in_terminal_state(): if not self.has_job: @@ -1631,12 +1631,12 @@ def wait_until_finish(self, duration=None): terminated = self.is_in_terminal_state() assert duration or terminated, ( 'Job did not reach to a terminal state after waiting indefinitely.' - 'Console log: {}'.format(consoleUrl)) + '{}'.format(consoleUrl)) if terminated and self.state != PipelineState.DONE: # TODO(BEAM-1290): Consider converting this to an error log based on # theresolution of the issue. - _LOGGER.error('Console URL {}'.format(consoleUrl)) + _LOGGER.error(consoleUrl) raise DataflowRuntimeException( 'Dataflow pipeline failed. State: %s, Error:\n%s' % (self.state, getattr(self._runner, 'last_error_msg', None)), diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index c7f3adcc79b8..d0b1b10913a9 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -55,7 +55,8 @@ def run_pipeline(self, pipeline, options): # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. print('Worker logs: %s' % self.build_console_url(options)) - _LOGGER.info('Console log: {}'.format(self.build_console_url(options))) + _LOGGER.info('Console log: ') + _LOGGER.info(self.build_console_url(options)) try: self.wait_until_in_state(PipelineState.RUNNING) @@ -85,10 +86,10 @@ def build_console_url(self, options): def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): """Wait until Dataflow pipeline enters a certain state.""" - consoleUrl = """https://console.cloud.google.com/dataflow/jobs/ + consoleUrl = """Console URL:https://console.cloud.google.com/dataflow/jobs/ /{}?project=""".format(self.result.job_id) if not self.result.has_job: - _LOGGER.error("Console log: {}".format(consoleUrl)) + _LOGGER.error(consoleUrl) raise IOError('Failed to get the Dataflow job id.') start_time = time.time() @@ -97,9 +98,8 @@ def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): if self.result.is_in_terminal_state() or job_state == expected_state: return job_state time.sleep(5) - _LOGGER.error('Console URL {}'.format(consoleUrl)) + _LOGGER.error(consoleUrl) raise RuntimeError( 'Timeout after %d seconds while waiting for job %s ' 'enters expected state %s. Current state is %s.' % (timeout, self.result.job_id(), expected_state, self.result.state)) - \ No newline at end of file From 72295ffa59673ce9e8f5122b15eb63f1984de350 Mon Sep 17 00:00:00 2001 From: andoni-guzman Date: Fri, 6 May 2022 12:31:37 -0500 Subject: [PATCH 07/13] typo invoking method --- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index df1676095c3f..cb5ac0b7ec15 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1609,7 +1609,7 @@ def is_in_terminal_state(self): def wait_until_finish(self, duration=None): consoleUrl = """Console URL: 'https://console.cloud.google.com/ - dataflow/jobs//{}?project='""".format(self.job_id) + dataflow/jobs//{}?project='""".format(self.job_id()) if not self.is_in_terminal_state(): if not self.has_job: raise IOError('Failed to get the Dataflow job id.') From 721653b1066c15050e0d4fe023b6b7b30b937d39 Mon Sep 17 00:00:00 2001 From: andoni-guzman Date: Fri, 6 May 2022 12:33:11 -0500 Subject: [PATCH 08/13] typo invoke method --- .../python/apache_beam/runners/dataflow/test_dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index d0b1b10913a9..166be2c7d538 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -87,7 +87,7 @@ def build_console_url(self, options): def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): """Wait until Dataflow pipeline enters a certain state.""" consoleUrl = """Console URL:https://console.cloud.google.com/dataflow/jobs/ - /{}?project=""".format(self.result.job_id) + /{}?project=""".format(self.result.job_id()) if not self.result.has_job: _LOGGER.error(consoleUrl) raise IOError('Failed to get the Dataflow job id.') From 5dd6336c88d63941a272e4a5a8963dd5652fafc3 Mon Sep 17 00:00:00 2001 From: tvalentyn Date: Mon, 9 May 2022 20:37:14 +0200 Subject: [PATCH 09/13] Update sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py --- .../python/apache_beam/runners/dataflow/test_dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index 166be2c7d538..8d393b0fae77 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -86,7 +86,7 @@ def build_console_url(self, options): def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): """Wait until Dataflow pipeline enters a certain state.""" - consoleUrl = """Console URL:https://console.cloud.google.com/dataflow/jobs/ + consoleUrl = """Console URL: https://console.cloud.google.com/dataflow/jobs/ /{}?project=""".format(self.result.job_id()) if not self.result.has_job: _LOGGER.error(consoleUrl) From 0b6ad35304b8f45c15cd04d072f6422d4c349153 Mon Sep 17 00:00:00 2001 From: tvalentyn Date: Mon, 9 May 2022 20:37:24 +0200 Subject: [PATCH 10/13] Update sdks/python/apache_beam/runners/dataflow/dataflow_runner.py --- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index cb5ac0b7ec15..577ac101d857 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1630,7 +1630,7 @@ def wait_until_finish(self, duration=None): # is_in_terminal_state. terminated = self.is_in_terminal_state() assert duration or terminated, ( - 'Job did not reach to a terminal state after waiting indefinitely.' + 'Job did not reach to a terminal state after waiting indefinitely. ' '{}'.format(consoleUrl)) if terminated and self.state != PipelineState.DONE: From 43958513239cbc29c600e5f421d823ba5a24ac72 Mon Sep 17 00:00:00 2001 From: andoni-guzman Date: Mon, 9 May 2022 16:35:35 -0500 Subject: [PATCH 11/13] Fix ereror with Pre-commit --- .../python/apache_beam/runners/dataflow/dataflow_runner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index cb5ac0b7ec15..18e78efee68f 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1608,12 +1608,13 @@ def is_in_terminal_state(self): return PipelineState.is_terminal(self._get_job_state()) def wait_until_finish(self, duration=None): - consoleUrl = """Console URL: 'https://console.cloud.google.com/ - dataflow/jobs//{}?project='""".format(self.job_id()) + if not self.is_in_terminal_state(): if not self.has_job: raise IOError('Failed to get the Dataflow job id.') - + + consoleUrl = """Console URL: 'https://console.cloud.google.com/ + dataflow/jobs//{}?project='""".format(self.job_id()) thread = threading.Thread( target=DataflowRunner.poll_for_job_completion, args=(self._runner, self, duration)) From 923f4d56d290c0d7323c088001045f12152a8492 Mon Sep 17 00:00:00 2001 From: andoni-guzman Date: Mon, 9 May 2022 18:17:50 -0500 Subject: [PATCH 12/13] Fix style errors --- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index f1f4f8ac5601..58238c9ed734 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1608,13 +1608,12 @@ def is_in_terminal_state(self): return PipelineState.is_terminal(self._get_job_state()) def wait_until_finish(self, duration=None): - if not self.is_in_terminal_state(): if not self.has_job: raise IOError('Failed to get the Dataflow job id.') - consoleUrl = """Console URL: 'https://console.cloud.google.com/ - dataflow/jobs//{}?project='""".format(self.job_id()) + dataflow/jobs//{}?project= + '""".format(self.job_id()) thread = threading.Thread( target=DataflowRunner.poll_for_job_completion, args=(self._runner, self, duration)) From 3e5ccbbabc9af86ae00cb15ccc38f9917b3a97a4 Mon Sep 17 00:00:00 2001 From: andoni-guzman Date: Tue, 10 May 2022 12:08:47 -0500 Subject: [PATCH 13/13] Fix multiline string --- .../python/apache_beam/runners/dataflow/dataflow_runner.py | 7 ++++--- .../apache_beam/runners/dataflow/test_dataflow_runner.py | 5 +++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 58238c9ed734..169b6a6cef2e 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1611,9 +1611,10 @@ def wait_until_finish(self, duration=None): if not self.is_in_terminal_state(): if not self.has_job: raise IOError('Failed to get the Dataflow job id.') - consoleUrl = """Console URL: 'https://console.cloud.google.com/ - dataflow/jobs//{}?project= - '""".format(self.job_id()) + consoleUrl = ( + "Console URL: https://console.cloud.google.com/" + f"dataflow/jobs//{self.job_id()}" + "?project=") thread = threading.Thread( target=DataflowRunner.poll_for_job_completion, args=(self._runner, self, duration)) diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index 8d393b0fae77..58bc05c39509 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -86,8 +86,9 @@ def build_console_url(self, options): def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): """Wait until Dataflow pipeline enters a certain state.""" - consoleUrl = """Console URL: https://console.cloud.google.com/dataflow/jobs/ - /{}?project=""".format(self.result.job_id()) + consoleUrl = ( + "Console URL: https://console.cloud.google.com/dataflow/" + f"/{self.result.job_id()}?project=") if not self.result.has_job: _LOGGER.error(consoleUrl) raise IOError('Failed to get the Dataflow job id.')