Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,25 +408,47 @@ def consume_logs(
"""
last_captured_timestamp = None
try:
if not logs:
logs = self.read_pod_logs(
pod=pod,
container_name=container_name,
timestamps=True,
since_seconds=(
math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
),
follow=follow,
post_termination_timeout=post_termination_timeout,
)
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace")
line_timestamp, message = self.parse_log_line(line)
logs = self.read_pod_logs(
pod=pod,
container_name=container_name,
timestamps=True,
since_seconds=(
math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
),
follow=follow,
post_termination_timeout=post_termination_timeout,
)
message_to_log = None
message_timestamp = None
progress_callback_lines = []
try:
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace")
line_timestamp, message = self.parse_log_line(line)
if line_timestamp: # detect new log line
if message_to_log is None: # first line in the log
message_to_log = message
message_timestamp = line_timestamp
progress_callback_lines.append(line)
else: # previous log line is complete
if self._progress_callback:
for line in progress_callback_lines:
self._progress_callback(line)
self.log.info("[%s] %s", container_name, message_to_log)
last_captured_timestamp = message_timestamp
message_to_log = message
message_timestamp = line_timestamp
progress_callback_lines = [line]
else: # continuation of the previous log line
message_to_log = f"{message_to_log}\n{message}"
progress_callback_lines.append(line)
finally:
# log the last line and update the last_captured_timestamp
if self._progress_callback:
self._progress_callback(line)
if line_timestamp is not None:
last_captured_timestamp = line_timestamp
self.log.info("[%s] %s", container_name, message)
for line in progress_callback_lines:
self._progress_callback(line)
self.log.info("[%s] %s", container_name, message_to_log)
last_captured_timestamp = message_timestamp
except BaseHTTPError as e:
self.log.warning(
"Reading of logs interrupted for container %r with error %r; will retry. "
Expand Down Expand Up @@ -570,16 +592,10 @@ def parse_log_line(self, line: str) -> tuple[DateTime | None, str]:
"""
timestamp, sep, message = line.strip().partition(" ")
if not sep:
self.log.error(
"Error parsing timestamp (no timestamp in message %r). "
"Will continue execution but won't update timestamp",
line,
)
return None, line
try:
last_log_time = cast(DateTime, pendulum.parse(timestamp))
except ParserError:
self.log.error("Error parsing timestamp. Will continue execution but won't update timestamp")
return None, line
return last_log_time, message

Expand Down
2 changes: 1 addition & 1 deletion kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ def test_volume_mount(self, mock_get_connection):
)
context = create_context(k)
k.execute(context=context)
mock_logger.info.assert_any_call("[%s] %s", "base", "retrieved from mount")
mock_logger.info.assert_any_call("[%s] %s", "base", "retrieved from mount\n")
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod["spec"]["containers"][0]["args"] = args
self.expected_pod["spec"]["containers"][0]["volumeMounts"] = [
Expand Down
27 changes: 23 additions & 4 deletions tests/providers/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,30 @@ def consumer_iter():
assert status.last_log_time == cast(DateTime, pendulum.parse(last_timestamp_string))
assert self.mock_progress_callback.call_count == expected_call_count

def test_parse_invalid_log_line(self, caplog):
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
def test_parse_multi_line_logs(self, mock_read_pod_logs, mock_container_is_running, caplog):
log = (
"2020-10-08T14:16:17.793417674Z message1 line1\n"
"message1 line2\n"
"message1 line3\n"
"2020-10-08T14:16:18.793417674Z message2 line1\n"
"message2 line2\n"
"2020-10-08T14:16:19.793417674Z message3 line1\n"
)
mock_read_pod_logs.return_value = [bytes(log_line, "utf-8") for log_line in log.split("\n")]
mock_container_is_running.return_value = False

with caplog.at_level(logging.INFO):
self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n")
assert "Invalidmessage" in caplog.text
assert "no timestamp in message" in caplog.text
self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True)

assert "message1 line1" in caplog.text
assert "message1 line2" in caplog.text
assert "message1 line3" in caplog.text
assert "message2 line1" in caplog.text
assert "message2 line2" in caplog.text
assert "message3 line1" in caplog.text
assert "ERROR" not in caplog.text

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async")
def test_start_pod_retries_on_409_error(self, mock_run_pod_async):
Expand Down