diff --git a/airflow/providers/docker/operators/docker.py b/airflow/providers/docker/operators/docker.py index 7c21f350cc541..9c4b45e6a2aa9 100644 --- a/airflow/providers/docker/operators/docker.py +++ b/airflow/providers/docker/operators/docker.py @@ -434,7 +434,8 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[ for log_chunk in logstream: log_chunk = stringify(log_chunk).strip() log_lines.append(log_chunk) - self.log.info("%s", log_chunk) + for log_chunk_line in log_chunk.split("\n"): + self.log.info("%s", log_chunk_line) result = self.cli.wait(self.container["Id"]) if result["StatusCode"] in self.skip_on_exit_code: @@ -492,7 +493,7 @@ def copy_from_docker(container_id, src): def execute(self, context: Context) -> list[str] | str | None: # Pull the docker image if `force_pull` is set or image does not exist locally if self.force_pull or not self.cli.images(name=self.image): - self.log.info("Pulling docker image %s", self.image) + self.log.info("::group::Pulling docker image %s", self.image) latest_status: dict[str, str] = {} for output in self.cli.pull(self.image, stream=True, decode=True): if isinstance(output, str): @@ -508,6 +509,7 @@ def execute(self, context: Context) -> list[str] | str | None: if latest_status.get(output_id) != output_status: self.log.info("%s: %s", output_id, output_status) latest_status[output_id] = output_status + self.log.info("::endgroup::") return self._run_image() @staticmethod