Skip to content

Conversation

@snjypl
Copy link
Contributor

@snjypl snjypl commented Jan 9, 2023

[ note: i have set delete_worker_pods to False so that the pod is not deleted after completion]

for k8s executor while trying to view the task log from the UI. we are getting the following error.

airflow-webserver-5bf48475c-zdjxv
*** Trying to get logs (last 100 lines) from worker pod airflow-webserver-5bf48475c-zdjxv ***

*** Unable to fetch logs from worker pod airflow-webserver-5bf48475c-zdjxv ***
('Cannot find pod for ti %s', <TaskInstance: dataset_produces_2.producing_task_2 manual__2022-12-28T21:13:27.229615+00:00 [success]>)

i think, the issue is, while calling PodGenerator.build_selector_for_k8s_executor_pod we are passing ti.try_number instead of the try_number that was passed to the _read method.

def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | None = None):

selector = PodGenerator.build_selector_for_k8s_executor_pod(
dag_id=ti.dag_id,
task_id=ti.task_id,
try_number=ti.try_number,
map_index=ti.map_index,
run_id=ti.run_id,
airflow_worker=ti.queued_by_job_id,
)


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@ashb
Copy link
Member

ashb commented Jan 11, 2023

Paging @dstandish

@snjypl snjypl force-pushed the bugfix/Fix-Unable-to-fetch-logs-from-worker-pod-error-in-UI-for-k8s-executor branch 4 times, most recently from 752c046 to 14724ce Compare January 18, 2023 12:17
@snjypl snjypl force-pushed the bugfix/Fix-Unable-to-fetch-logs-from-worker-pod-error-in-UI-for-k8s-executor branch 2 times, most recently from 05f0d7d to aa64a4f Compare January 23, 2023 10:50
@eladkal eladkal added this to the Airflow 2.5.2 milestone Jan 23, 2023
@snjypl snjypl force-pushed the bugfix/Fix-Unable-to-fetch-logs-from-worker-pod-error-in-UI-for-k8s-executor branch 2 times, most recently from b54cdcc to 6d1d744 Compare January 24, 2023 11:52
@dstandish
Copy link
Contributor

Looks reasonable to me, but needs a rebase

@snjypl snjypl force-pushed the bugfix/Fix-Unable-to-fetch-logs-from-worker-pod-error-in-UI-for-k8s-executor branch from 6d1d744 to bb17bf5 Compare February 14, 2023 20:17
Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, pending passing tests of course.

Comment on lines +359 to 360
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait this bit doesn't make sense to me. if we already have the TI, why is it we also need to supply the try number?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some explanation in the code for read:

def read(self, task_instance, try_number=None, metadata=None):
"""
Read logs of given task instance from local machine.
:param task_instance: task instance object
:param try_number: task instance try_number to read logs from. If None
it returns all logs separated by try_number
:param metadata: log metadata, can be used for steaming log reading and auto-tailing.
:return: a list of listed tuples which order log string by host
"""
# Task instance increments its try number when it starts to run.
# So the log for a particular task try will only show up when
# try number gets incremented in DB, i.e logs produced the time
# after cli run and before try_number + 1 in DB will not be displayed.
if try_number is None:
next_try = task_instance.next_try_number
try_numbers = list(range(1, next_try))
elif try_number < 1:
logs = [
[("default_host", f"Error fetching the logs. Try number {try_number} is invalid.")],
]
return logs, [{"end_of_log": True}]
else:
try_numbers = [try_number]
logs = [""] * len(try_numbers)
metadata_array = [{}] * len(try_numbers)
# subclasses implement _read and may not have log_type, which was added recently
for i, try_number_element in enumerate(try_numbers):
log, out_metadata = self._read(task_instance, try_number_element, metadata)
# es_task_handler return logs grouped by host. wrap other handler returning log string
# with default/ empty host so that UI can render the response in the same way
logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)]
metadata_array[i] = out_metadata

try_number is plumbed in from read to _read and then should have been sent to the executors, but it was a miss:

def _read(
self,
ti: TaskInstance,
try_number: int,
metadata: dict[str, Any] | None = None,
):
"""
Template method that contains custom logic of reading
logs given the try_number.
:param ti: task instance record
:param try_number: current try_number to read log from
:param metadata: log metadata,
can be used for steaming log reading and auto-tailing.
Following attributes are used:
log_pos: (absolute) Char position to which the log
which was retrieved in previous calls, this
part will be skipped and only following test
returned to be added to tail.
:return: log message as a string and metadata.
Following attributes are used in metadata:
end_of_log: Boolean, True if end of log is reached or False
if further calls might get more log text.
This is determined by the status of the TaskInstance
log_pos: (absolute) Char position to which the log is retrieved
"""
# Task instance here might be different from task instance when
# initializing the handler. Thus explicitly getting log location
# is needed to get correct log path.
worker_log_rel_path = self._render_filename(ti, try_number)
messages_list: list[str] = []
remote_logs: list[str] = []
running_logs: list[str] = []
local_logs: list[str] = []
executor_messages: list[str] = []
executor_logs: list[str] = []
served_logs: list[str] = []
with suppress(NotImplementedError):
remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata)
messages_list.extend(remote_messages)
if ti.state == TaskInstanceState.RUNNING:
response = self._executor_get_task_log(ti)

@snjypl snjypl force-pushed the bugfix/Fix-Unable-to-fetch-logs-from-worker-pod-error-in-UI-for-k8s-executor branch 5 times, most recently from 190d348 to 60f7d12 Compare February 16, 2023 14:45
ti.try_number was used for fetching log from k8s pod.
it was causing incorrect log being returned for k8s pod.
fixed by passing try_number from _read to get_task_log method
use try_number argument instead of ti.try_number for selecting pod in
k8s executor
@snjypl snjypl force-pushed the bugfix/Fix-Unable-to-fetch-logs-from-worker-pod-error-in-UI-for-k8s-executor branch from 60f7d12 to 7357b53 Compare February 27, 2023 07:03
@o-nikolas
Copy link
Contributor

Anyone have more feedback for this one or shall I merge it?

@potiuk
Copy link
Member

potiuk commented Mar 1, 2023

@dstandish ?

@pierrejeambrun pierrejeambrun added the type:bug-fix Changelog: Bug Fixes label Mar 1, 2023
@o-nikolas
Copy link
Contributor

Shall we just merge this fix then @potiuk @eladkal? Worst case we can always follow-up with more changes.

@eladkal eladkal modified the milestones: Airflow 2.5.2, Airflow 2.5.3 Mar 9, 2023
@eladkal eladkal merged commit f5ed4d5 into apache:main Mar 9, 2023
@snjypl snjypl deleted the bugfix/Fix-Unable-to-fetch-logs-from-worker-pod-error-in-UI-for-k8s-executor branch March 9, 2023 15:13
@pierrejeambrun
Copy link
Member

Conflicting, requires #29482 and #28161. Marking for 2.6.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:logging type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants