From f287ebc41ea7aff4874834be16f358ba1bea3171 Mon Sep 17 00:00:00 2001 From: bugkeep <1921817430@qq.com> Date: Sat, 25 Apr 2026 23:00:07 +0800 Subject: [PATCH 1/3] fix(core): downscale oversized images --- astrbot/core/utils/media_utils.py | 22 +++++- tests/unit/test_media_utils_compress_image.py | 73 +++++++++++++++++++ 2 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 tests/unit/test_media_utils_compress_image.py diff --git a/astrbot/core/utils/media_utils.py b/astrbot/core/utils/media_utils.py index 40f1e60495..f00edee137 100644 --- a/astrbot/core/utils/media_utils.py +++ b/astrbot/core/utils/media_utils.py @@ -436,19 +436,37 @@ async def compress_image( optimize = IMAGE_COMPRESS_DEFAULT_OPTIMIZE min_file_size_bytes = int(IMAGE_COMPRESS_DEFAULT_MIN_FILE_SIZE_MB * 1024 * 1024) data = None + + def _exceeds_max_size_bytes(raw: bytes) -> bool: + try: + with PILImage.open(io.BytesIO(raw)) as opened_img: + return max(opened_img.size) > max_size + except Exception: # noqa: BLE001 + return False + + def _exceeds_max_size_path(path: Path) -> bool: + try: + with PILImage.open(path) as opened_img: + return max(opened_img.size) > max_size + except Exception: # noqa: BLE001 + return False + # Skip compression for remote images and return the original value. if url_or_path.startswith("http"): return url_or_path elif url_or_path.startswith("data:image"): _header, encoded = url_or_path.split(",", 1) data = base64.b64decode(encoded) - if len(data) < min_file_size_bytes: + if len(data) < min_file_size_bytes and not _exceeds_max_size_bytes(data): return url_or_path else: local_path = Path(url_or_path) if not local_path.exists(): return url_or_path - if local_path.stat().st_size < min_file_size_bytes: + if ( + local_path.stat().st_size < min_file_size_bytes + and not _exceeds_max_size_path(local_path) + ): return url_or_path with local_path.open("rb") as f: data = f.read() diff --git a/tests/unit/test_media_utils_compress_image.py b/tests/unit/test_media_utils_compress_image.py new file mode 100644 index 0000000000..fc4d32c69e --- /dev/null +++ b/tests/unit/test_media_utils_compress_image.py @@ -0,0 +1,73 @@ +"""Tests for media_utils.compress_image().""" + +import base64 +from pathlib import Path + +import pytest +from PIL import Image as PILImage + +from astrbot.core.utils import media_utils + + +@pytest.mark.asyncio +async def test_compress_image_downscales_small_files_exceeding_max_size( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +): + monkeypatch.setattr(media_utils, "get_astrbot_temp_path", lambda: str(tmp_path)) + + img_path = tmp_path / "big.png" + with PILImage.new("RGB", (1900, 2532), color=(255, 0, 0)) as img: + img.save(img_path, format="PNG", optimize=True) + + assert img_path.stat().st_size < 1024 * 1024 + + compressed_path = await media_utils.compress_image(str(img_path), max_size=1280) + + assert compressed_path != str(img_path) + assert Path(compressed_path).exists() + with PILImage.open(compressed_path) as compressed_img: + assert max(compressed_img.size) <= 1280 + + +@pytest.mark.asyncio +async def test_compress_image_skips_small_files_within_max_size( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +): + monkeypatch.setattr(media_utils, "get_astrbot_temp_path", lambda: str(tmp_path)) + + img_path = tmp_path / "small.png" + with PILImage.new("RGB", (800, 600), color=(0, 0, 255)) as img: + img.save(img_path, format="PNG", optimize=True) + + assert img_path.stat().st_size < 1024 * 1024 + + compressed_path = await media_utils.compress_image(str(img_path), max_size=1280) + + assert compressed_path == str(img_path) + + +@pytest.mark.asyncio +async def test_compress_image_downscales_data_url_even_when_small( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +): + monkeypatch.setattr(media_utils, "get_astrbot_temp_path", lambda: str(tmp_path)) + + buffer = Path(tmp_path) / "big.png" + with PILImage.new("RGB", (1900, 2532), color=(0, 255, 0)) as img: + img.save(buffer, format="PNG", optimize=True) + raw = buffer.read_bytes() + + assert len(raw) < 1024 * 1024 + + encoded = base64.b64encode(raw).decode() + data_url = f"data:image/png;base64,{encoded}" + compressed_path = await media_utils.compress_image(data_url, max_size=1280) + + assert compressed_path != data_url + assert Path(compressed_path).exists() + with PILImage.open(compressed_path) as compressed_img: + assert max(compressed_img.size) <= 1280 + From e6c0764516fc60f3fbac2a8dfdc3144d93bf90cc Mon Sep 17 00:00:00 2001 From: bugkeep <1921817430@qq.com> Date: Sun, 26 Apr 2026 20:19:18 +0800 Subject: [PATCH 2/3] refactor: share image max-size check helper --- astrbot/core/utils/media_utils.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/astrbot/core/utils/media_utils.py b/astrbot/core/utils/media_utils.py index f00edee137..03d7912cb6 100644 --- a/astrbot/core/utils/media_utils.py +++ b/astrbot/core/utils/media_utils.py @@ -437,16 +437,10 @@ async def compress_image( min_file_size_bytes = int(IMAGE_COMPRESS_DEFAULT_MIN_FILE_SIZE_MB * 1024 * 1024) data = None - def _exceeds_max_size_bytes(raw: bytes) -> bool: + def _exceeds_max_size(source: bytes | Path) -> bool: try: - with PILImage.open(io.BytesIO(raw)) as opened_img: - return max(opened_img.size) > max_size - except Exception: # noqa: BLE001 - return False - - def _exceeds_max_size_path(path: Path) -> bool: - try: - with PILImage.open(path) as opened_img: + fp = io.BytesIO(source) if isinstance(source, bytes) else source + with PILImage.open(fp) as opened_img: return max(opened_img.size) > max_size except Exception: # noqa: BLE001 return False @@ -457,15 +451,14 @@ def _exceeds_max_size_path(path: Path) -> bool: elif url_or_path.startswith("data:image"): _header, encoded = url_or_path.split(",", 1) data = base64.b64decode(encoded) - if len(data) < min_file_size_bytes and not _exceeds_max_size_bytes(data): + if len(data) < min_file_size_bytes and not _exceeds_max_size(data): return url_or_path else: local_path = Path(url_or_path) if not local_path.exists(): return url_or_path - if ( - local_path.stat().st_size < min_file_size_bytes - and not _exceeds_max_size_path(local_path) + if local_path.stat().st_size < min_file_size_bytes and not _exceeds_max_size( + local_path ): return url_or_path with local_path.open("rb") as f: From 174a19cff34920e8a6b995946af31dc88c77a5e4 Mon Sep 17 00:00:00 2001 From: Weilong Liao <37870767+Soulter@users.noreply.github.com> Date: Sun, 26 Apr 2026 23:10:16 +0800 Subject: [PATCH 3/3] Delete tests/unit/test_media_utils_compress_image.py --- tests/unit/test_media_utils_compress_image.py | 73 ------------------- 1 file changed, 73 deletions(-) delete mode 100644 tests/unit/test_media_utils_compress_image.py diff --git a/tests/unit/test_media_utils_compress_image.py b/tests/unit/test_media_utils_compress_image.py deleted file mode 100644 index fc4d32c69e..0000000000 --- a/tests/unit/test_media_utils_compress_image.py +++ /dev/null @@ -1,73 +0,0 @@ -"""Tests for media_utils.compress_image().""" - -import base64 -from pathlib import Path - -import pytest -from PIL import Image as PILImage - -from astrbot.core.utils import media_utils - - -@pytest.mark.asyncio -async def test_compress_image_downscales_small_files_exceeding_max_size( - tmp_path: Path, - monkeypatch: pytest.MonkeyPatch, -): - monkeypatch.setattr(media_utils, "get_astrbot_temp_path", lambda: str(tmp_path)) - - img_path = tmp_path / "big.png" - with PILImage.new("RGB", (1900, 2532), color=(255, 0, 0)) as img: - img.save(img_path, format="PNG", optimize=True) - - assert img_path.stat().st_size < 1024 * 1024 - - compressed_path = await media_utils.compress_image(str(img_path), max_size=1280) - - assert compressed_path != str(img_path) - assert Path(compressed_path).exists() - with PILImage.open(compressed_path) as compressed_img: - assert max(compressed_img.size) <= 1280 - - -@pytest.mark.asyncio -async def test_compress_image_skips_small_files_within_max_size( - tmp_path: Path, - monkeypatch: pytest.MonkeyPatch, -): - monkeypatch.setattr(media_utils, "get_astrbot_temp_path", lambda: str(tmp_path)) - - img_path = tmp_path / "small.png" - with PILImage.new("RGB", (800, 600), color=(0, 0, 255)) as img: - img.save(img_path, format="PNG", optimize=True) - - assert img_path.stat().st_size < 1024 * 1024 - - compressed_path = await media_utils.compress_image(str(img_path), max_size=1280) - - assert compressed_path == str(img_path) - - -@pytest.mark.asyncio -async def test_compress_image_downscales_data_url_even_when_small( - tmp_path: Path, - monkeypatch: pytest.MonkeyPatch, -): - monkeypatch.setattr(media_utils, "get_astrbot_temp_path", lambda: str(tmp_path)) - - buffer = Path(tmp_path) / "big.png" - with PILImage.new("RGB", (1900, 2532), color=(0, 255, 0)) as img: - img.save(buffer, format="PNG", optimize=True) - raw = buffer.read_bytes() - - assert len(raw) < 1024 * 1024 - - encoded = base64.b64encode(raw).decode() - data_url = f"data:image/png;base64,{encoded}" - compressed_path = await media_utils.compress_image(data_url, max_size=1280) - - assert compressed_path != data_url - assert Path(compressed_path).exists() - with PILImage.open(compressed_path) as compressed_img: - assert max(compressed_img.size) <= 1280 -