-
Notifications
You must be signed in to change notification settings - Fork 152
Rewrite gzip component to support trailing bytes without external tool and compress with PIGZ. Addresses #476 #485
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
rbs-jacob
merged 20 commits into
redballoonsecurity:master
from
alchzh:python-gzip-trailing-bytes
Aug 15, 2024
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
0a8c069
Rewrite gzip component to support trailing bytes without external too…
alchzh 3331f71
Switch to pigz for 4MiB or larger files and pipe directly to stdin in…
alchzh eecfcbe
Update docstring in test_gzip_component.py
alchzh 37ba179
Refactor unpack logic to separate functions
alchzh ed9005f
cache result of is_tool_installed
alchzh e9fbdb3
Comprehensive gzip test cases
alchzh 9b5315a
Make ComponentExternalTool hashable based on tool and install_check_arg
alchzh c0640ad
Merge branch 'redballoonsecurity:master' into python-gzip-trailing-bytes
alchzh 7144874
Update previous gzip related changelog message
alchzh 6305010
Actually use pigz as a fallback, clarify changelog message
alchzh 5e2c620
Raise NotImplementedError instance in write_gzip() and make it abstra…
alchzh ad6469a
Revert caching of is_tool_installed in ComponentExternalTool
alchzh 595e5e1
Cache PIGZ installed or not in gzip component module
alchzh 595b9af
Test that PIGZ is used for packing large file and NOT used for small …
alchzh 8609b8e
Only use PIGZ for compression, not decompression
alchzh f996194
Merge remote-tracking branch 'origin/master' into python-gzip-trailin…
alchzh 9753ca5
Update ofrak_core/CHANGELOG.md
whyitfor d954073
Improve comments in `unpack_with_zlib_module`
alchzh bec8fa1
Correctly handle multiple member decompression
alchzh 88f230b
Move wbits comment line
alchzh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
162 changes: 92 additions & 70 deletions
162
ofrak_core/test_ofrak/components/test_gzip_component.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,87 +1,109 @@ | ||
| import os | ||
| import subprocess | ||
| import tempfile | ||
| from gzip import GzipFile | ||
| from io import BytesIO | ||
|
|
||
| import zlib | ||
| import gzip | ||
| from pathlib import Path | ||
| from asyncio import create_subprocess_exec | ||
| from typing import Tuple | ||
| from unittest.mock import patch | ||
| from abc import ABC, abstractmethod | ||
|
|
||
| from ofrak.component.abstract import ComponentSubprocessError | ||
| import pytest | ||
|
|
||
| from ofrak import OFRAKContext | ||
| from ofrak.ofrak_context import OFRAKContext | ||
| from ofrak.resource import Resource | ||
| from ofrak.core.gzip import GzipData | ||
| from pytest_ofrak.patterns.compressed_filesystem_unpack_modify_pack import ( | ||
| CompressedFileUnpackModifyPackPattern, | ||
| ) | ||
| from pytest_ofrak.patterns.unpack_modify_pack import UnpackModifyPackPattern | ||
|
|
||
| ASSETS_DIR = Path(__file__).parent / "assets" | ||
|
|
||
|
|
||
| class TestGzipUnpackModifyPack(CompressedFileUnpackModifyPackPattern): | ||
| @pytest.fixture( | ||
| autouse=True, | ||
| scope="module", | ||
| params=[ | ||
| (ASSETS_DIR / "hello_world", ASSETS_DIR / "hello_ofrak", False), | ||
| (ASSETS_DIR / "random8M", ASSETS_DIR / "random8M_modified", True), | ||
| ], | ||
| ids=["hello world", "<random 8MB data>"], | ||
| ) | ||
| def gzip_test_input(request): | ||
| initial_path, repacked_path, expect_pigz = request.param | ||
| with open(initial_path, "rb") as initial_file: | ||
| initial_data = initial_file.read() | ||
| with open(repacked_path, "rb") as repacked_file: | ||
| expected_repacked_data = repacked_file.read() | ||
| return (initial_data, expected_repacked_data, expect_pigz) | ||
|
|
||
|
|
||
| class GzipUnpackModifyPackPattern(CompressedFileUnpackModifyPackPattern, ABC): | ||
| """ | ||
| Template for tests that test different inputs the gzip component should support | ||
| unpacking. | ||
| """ | ||
|
|
||
| EXPECT_PIGZ: bool | ||
| expected_tag = GzipData | ||
|
|
||
| @pytest.fixture(autouse=True) | ||
| def create_test_file(self, tmpdir): | ||
| d = tmpdir.mkdir("gzip") | ||
| fh = d.join("hello.gz") | ||
| result = BytesIO() | ||
| with GzipFile(fileobj=result, mode="w") as gzip_file: | ||
| gzip_file.write(self.INITIAL_DATA) | ||
| fh.write_binary(result.getvalue()) | ||
| @abstractmethod | ||
| def write_gzip(self, gzip_path: Path): | ||
| raise NotImplementedError() | ||
|
|
||
| self._test_file = fh.realpath() | ||
| @pytest.fixture(autouse=True) | ||
| def create_test_file(self, gzip_test_input: Tuple[bytes, bytes, bool], tmp_path: Path): | ||
| self.INITIAL_DATA, self.EXPECTED_REPACKED_DATA, self.EXPECT_PIGZ = gzip_test_input | ||
| gzip_path = tmp_path / "test.gz" | ||
| self.write_gzip(gzip_path) | ||
| self._test_file = gzip_path.resolve() | ||
|
|
||
| async def test_unpack_modify_pack(self, ofrak_context: OFRAKContext): | ||
| with patch("asyncio.create_subprocess_exec", wraps=create_subprocess_exec) as mock_exec: | ||
| if self.EXPECT_PIGZ: | ||
| await super().test_unpack_modify_pack(ofrak_context) | ||
| assert any( | ||
| args[0][0] == "pigz" and args[0][1] == "-c" for args in mock_exec.call_args_list | ||
| ) | ||
| else: | ||
| await super().test_unpack_modify_pack(ofrak_context) | ||
| mock_exec.assert_not_called() | ||
|
|
||
| async def verify(self, repacked_root_resource: Resource): | ||
| patched_gzip_file = GzipFile(fileobj=BytesIO(await repacked_root_resource.get_data())) | ||
| patched_decompressed_data = patched_gzip_file.read() | ||
| patched_decompressed_data = gzip.decompress(await repacked_root_resource.get_data()) | ||
| assert patched_decompressed_data == self.EXPECTED_REPACKED_DATA | ||
|
|
||
|
|
||
| class TestGzipUnpackWithTrailingBytes(UnpackModifyPackPattern): | ||
| EXPECTED_TAG = GzipData | ||
| INITIAL_DATA = b"Hello World" | ||
| EXPECTED_DATA = INITIAL_DATA # Change expected when modifier is created | ||
| INNER_FILENAME = "hello.bin" | ||
| GZIP_FILENAME = "hello.bin.gz" | ||
|
|
||
| async def create_root_resource(self, ofrak_context: OFRAKContext) -> Resource: | ||
| with tempfile.TemporaryDirectory() as d: | ||
| file_path = os.path.join(d, self.INNER_FILENAME) | ||
| with open(file_path, "wb") as f: | ||
| f.write(self.INITIAL_DATA) | ||
|
|
||
| gzip_path = os.path.join(d, self.GZIP_FILENAME) | ||
| gzip_command = ["pigz", file_path] | ||
| subprocess.run(gzip_command, check=True, capture_output=True) | ||
|
|
||
| # Add trailing bytes | ||
| with open(gzip_path, "ab") as a: | ||
| a.write(b"\xDE\xAD\xBE\xEF") | ||
| a.close() | ||
| return await ofrak_context.create_root_resource_from_file(gzip_path) | ||
|
|
||
| async def unpack(self, root_resource: Resource) -> None: | ||
| await root_resource.unpack_recursively() | ||
|
|
||
| async def modify(self, root_resource: Resource) -> None: | ||
| pass | ||
|
|
||
| async def repack(self, root_resource: Resource) -> None: | ||
| pass | ||
|
|
||
| async def verify(self, root_resource: Resource) -> None: | ||
| gzip_data = await root_resource.get_data() | ||
| with tempfile.TemporaryDirectory() as d: | ||
| gzip_path = os.path.join(d, self.GZIP_FILENAME) | ||
| with open(gzip_path, "wb") as f: | ||
| f.write(gzip_data) | ||
|
|
||
| gunzip_command = ["pigz", "-d", "-c", gzip_path] | ||
| try: | ||
| result = subprocess.run(gunzip_command, check=True, capture_output=True) | ||
| data = result.stdout | ||
| except subprocess.CalledProcessError as e: | ||
| if e.returncode == 2 or e.returncode == -2: | ||
| data = e.stdout | ||
| else: | ||
| raise | ||
|
|
||
| assert data == self.EXPECTED_DATA | ||
| class TestGzipUnpackModifyPack(GzipUnpackModifyPackPattern): | ||
| def write_gzip(self, gzip_path: Path): | ||
| with gzip.GzipFile(gzip_path, mode="w") as gzip_file: | ||
| gzip_file.write(self.INITIAL_DATA) | ||
|
|
||
|
|
||
| class TestGzipWithMultipleMembersUnpackModifyPack(GzipUnpackModifyPackPattern): | ||
| def write_gzip(self, gzip_path: Path): | ||
| middle = len(self.INITIAL_DATA) // 2 | ||
| with gzip.GzipFile(gzip_path, mode="w") as gzip_file: | ||
| gzip_file.write(self.INITIAL_DATA[:middle]) | ||
|
|
||
| with gzip.GzipFile(gzip_path, mode="a") as gzip_file: | ||
| gzip_file.write(self.INITIAL_DATA[middle:]) | ||
|
|
||
|
|
||
| class TestGzipWithTrailingBytesUnpackModifyPack(GzipUnpackModifyPackPattern): | ||
| def write_gzip(self, gzip_path: Path): | ||
| with gzip.GzipFile(gzip_path, mode="w") as gzip_file: | ||
| gzip_file.write(self.INITIAL_DATA) | ||
|
alchzh marked this conversation as resolved.
|
||
|
|
||
| with open(gzip_path, "ab") as raw_file: | ||
| raw_file.write(b"\xDE\xAD\xBE\xEF") | ||
|
|
||
|
|
||
| async def test_corrupted_gzip_fail( | ||
| gzip_test_input: Tuple[bytes, bytes, bool], ofrak_context: OFRAKContext | ||
| ): | ||
| initial_data = gzip_test_input[0] | ||
| corrupted_data = bytearray(gzip.compress(initial_data)) | ||
| corrupted_data[10] = 255 | ||
| resource = await ofrak_context.create_root_resource("corrupted.gz", data=bytes(corrupted_data)) | ||
| with pytest.raises((zlib.error, ComponentSubprocessError)): | ||
| await resource.unpack() | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.