Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1611,7 +1611,10 @@ def wait_until_finish(self, duration=None):
if not self.is_in_terminal_state():
if not self.has_job:
raise IOError('Failed to get the Dataflow job id.')

consoleUrl = (
"Console URL: https://console.cloud.google.com/"
f"dataflow/jobs/<RegionId>/{self.job_id()}"
"?project=<ProjectId>")
thread = threading.Thread(
target=DataflowRunner.poll_for_job_completion,
args=(self._runner, self, duration))
Expand All @@ -1628,11 +1631,13 @@ def wait_until_finish(self, duration=None):
# is_in_terminal_state.
terminated = self.is_in_terminal_state()
assert duration or terminated, (
'Job did not reach to a terminal state after waiting indefinitely.')
'Job did not reach to a terminal state after waiting indefinitely. '
'{}'.format(consoleUrl))

if terminated and self.state != PipelineState.DONE:
# TODO(BEAM-1290): Consider converting this to an error log based on
# theresolution of the issue.
_LOGGER.error(consoleUrl)
raise DataflowRuntimeException(
'Dataflow pipeline failed. State: %s, Error:\n%s' %
(self.state, getattr(self._runner, 'last_error_msg', None)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def run_pipeline(self, pipeline, options):
# TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
# in some cases.
print('Worker logs: %s' % self.build_console_url(options))
_LOGGER.info('Console log: ')
_LOGGER.info(self.build_console_url(options))

try:
self.wait_until_in_state(PipelineState.RUNNING)
Expand Down Expand Up @@ -84,7 +86,11 @@ def build_console_url(self, options):

def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT):
"""Wait until Dataflow pipeline enters a certain state."""
consoleUrl = (
"Console URL: https://console.cloud.google.com/dataflow/"
f"<regionId>/{self.result.job_id()}?project=<projectId>")
if not self.result.has_job:
_LOGGER.error(consoleUrl)
raise IOError('Failed to get the Dataflow job id.')

start_time = time.time()
Expand All @@ -93,7 +99,7 @@ def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT):
if self.result.is_in_terminal_state() or job_state == expected_state:
return job_state
time.sleep(5)

_LOGGER.error(consoleUrl)
raise RuntimeError(
'Timeout after %d seconds while waiting for job %s '
'enters expected state %s. Current state is %s.' %
Expand Down