Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ofrak_components/Dockerstub
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ RUN apt-get -y update && \
qemu \
qemu-user-static \
u-boot-tools \
unar
unar \
zstd

# Install apktool and uber-apk-signer
RUN apt-get -y update && apt-get -y install openjdk-11-jdk
Expand Down
97 changes: 97 additions & 0 deletions ofrak_components/ofrak_components/zstd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import subprocess
import tempfile
from dataclasses import dataclass
Comment thread
rbs-jacob marked this conversation as resolved.
from typing import Optional

from ofrak import Packer, Unpacker, Resource
from ofrak.component.packer import PackerError
from ofrak.component.unpacker import UnpackerError
from ofrak.core import (
GenericBinary,
format_called_process_error,
MagicMimeIdentifier,
MagicDescriptionIdentifier,
)
from ofrak.model.component_model import CC, ComponentConfig
from ofrak_type.range import Range


class ZstdData(GenericBinary):
"""
A zstd binary blob.
"""

async def get_child(self) -> GenericBinary:
return await self.resource.get_only_child_as_view(GenericBinary)


@dataclass
class ZstdPackerConfig(ComponentConfig):
compression_level: int
Comment thread
rbs-jacob marked this conversation as resolved.


class ZstdUnpacker(Unpacker[None]):
"""
Unpack (decompress) a zstd file.
"""

id = b"ZstdUnpacker"
targets = (ZstdData,)
children = (GenericBinary,)

async def unpack(self, resource: Resource, config: CC) -> None:
with tempfile.NamedTemporaryFile(suffix=".zstd") as compressed_file:
compressed_file.write(await resource.get_data())
compressed_file.flush()
output_filename = tempfile.mktemp()

command = ["zstd", "-d", "-k", compressed_file.name, "-o", output_filename]
try:
subprocess.run(command, check=True)
with open(output_filename, "rb") as f:
result = f.read()
except subprocess.CalledProcessError as e:
raise UnpackerError(format_called_process_error(e))

await resource.create_child(tags=(GenericBinary,), data=result)


class ZstdPacker(Packer[ZstdPackerConfig]):
"""
Pack data into a compressed zstd file.
"""

targets = (ZstdData,)

async def pack(self, resource: Resource, config: Optional[ZstdPackerConfig] = None):
if config is None:
config = ZstdPackerConfig(compression_level=19)
zstd_view = await resource.view_as(ZstdData)
child_file = await zstd_view.get_child()
uncompressed_data = await child_file.resource.get_data()

with tempfile.NamedTemporaryFile() as uncompressed_file:
uncompressed_file.write(uncompressed_data)
uncompressed_file.flush()
output_filename = tempfile.mktemp()

command = ["zstd", "-T0", f"-{config.compression_level}"]
if config.compression_level > 19:
command.append("--ultra")
command.extend([uncompressed_file.name, "-o", output_filename])
try:
subprocess.run(command, check=True)
with open(output_filename, "rb") as f:
result = f.read()
except subprocess.CalledProcessError as e:
raise PackerError(format_called_process_error(e))

compressed_data = result
original_size = await zstd_view.resource.get_data_length()
resource.queue_patch(Range(0, original_size), compressed_data)


MagicMimeIdentifier.register(ZstdData, "application/x-zstd")
MagicDescriptionIdentifier.register(
ZstdData, lambda s: s.lower().startswith("zstandard compressed data")
)
45 changes: 45 additions & 0 deletions ofrak_components/ofrak_components_test/test_zstd_component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import subprocess
import tempfile

import pytest

from ofrak.core.filesystem import format_called_process_error
from ofrak.resource import Resource
from pytest_ofrak.patterns.compressed_filesystem_unpack_modify_pack import (
CompressedFileUnpackModifyPackPattern,
)


class TestZstdUnpackModifyPack(CompressedFileUnpackModifyPackPattern):
@pytest.fixture(autouse=True)
def create_test_file(self, tmpdir):
d = tmpdir.mkdir("zstd")
uncompressed_filename = d.join("hello.txt").realpath()
with open(uncompressed_filename, "wb") as f:
f.write(self.INITIAL_DATA)

compressed_filename = d.join("hello.zstd").realpath()
command = ["zstd", "-19", uncompressed_filename, "-o", compressed_filename]
try:
subprocess.run(command, check=True, capture_output=True)
except subprocess.CalledProcessError as e:
raise RuntimeError(format_called_process_error(e))

self._test_file = compressed_filename

async def verify(self, repacked_root_resource: Resource) -> None:
compressed_data = await repacked_root_resource.get_data()
with tempfile.NamedTemporaryFile(suffix=".zstd") as compressed_file:
compressed_file.write(compressed_data)
compressed_file.flush()
output_filename = tempfile.mktemp()

command = ["zstd", "-d", "-k", compressed_file.name, "-o", output_filename]
try:
subprocess.run(command, check=True, capture_output=True)
with open(output_filename, "rb") as f:
result = f.read()
except subprocess.CalledProcessError as e:
raise RuntimeError(format_called_process_error(e))

assert result == self.EXPECTED_REPACKED_DATA