diff --git a/smpmgr/main.py b/smpmgr/main.py index 6ed0cc1..dc41a56 100644 --- a/smpmgr/main.py +++ b/smpmgr/main.py @@ -14,7 +14,7 @@ from smp.os_management import OS_MGMT_RET_RC from smpclient.generics import error, error_v1, error_v2, success from smpclient.mcuboot import IMAGE_TLV, ImageInfo, TLVNotFound -from smpclient.requests.image_management import ImageStatesWrite +from smpclient.requests.image_management import ImageStatesRead, ImageStatesWrite from smpclient.requests.os_management import ResetWrite from typing_extensions import Annotated, assert_never @@ -169,37 +169,83 @@ def upgrade( "(or some other mechanism).", ), ] = False, + bypass_inspect: Annotated[ + bool, + typer.Option( + "--bypass-inspect", + help="Skip local MCUboot image inspection and read the image hash from the device " + "instead of extracting it from the file. " + "This is useful when uploading images that are not in MCUboot format, such as " + "custom bootloader formats (e.g., NXP's SB3.1) where the hash may be calculated " + "differently (e.g., over a specific block rather than the entire binary). " + "[bold red]WARNING[/bold red]: When using this option, the responsibility for " + "validating image integrity is placed entirely on the device's bootloader. " + "If the bootloader does not verify the image, corrupted firmware could be uploaded " + "and marked as valid. " + "It's assumed the image format encodes some sort of integrity check " + "(e.g., CRC or hash)." + "[bold red]Only use this option if your bootloader performs its own image integrity " + "validation.[/bold red]", + ), + ] = False, ) -> None: """Upload a FW image, mark it for next boot, and reset the device.""" - try: - image_info = ImageInfo.load_file(str(file)) - logger.info(str(image_info)) - except Exception as e: - typer.echo(f"Inspection of FW image failed: {e}") - raise typer.Exit(code=1) + if not bypass_inspect: + try: + image_info = ImageInfo.load_file(str(file)) + logger.info(str(image_info)) + except Exception as e: + typer.echo(f"Inspection of FW image failed: {e}") + raise typer.Exit(code=1) - try: - image_tlv_sha256 = image_info.get_tlv(IMAGE_TLV.SHA256) - logger.info(f"IMAGE_TLV_SHA256: {image_tlv_sha256}") - except TLVNotFound: - typer.echo("Could not find IMAGE_TLV_SHA256 in image.") - raise typer.Exit(code=1) + try: + image_tlv_sha256 = image_info.get_tlv(IMAGE_TLV.SHA256) + logger.info(f"IMAGE_TLV_SHA256: {image_tlv_sha256}") + except TLVNotFound: + typer.echo("Could not find IMAGE_TLV_SHA256 in image.") + raise typer.Exit(code=1) options = cast(Options, ctx.obj) smpclient = get_smpclient(options) async def f() -> None: + image_hash: bytes | None = None await connect_with_spinner(smpclient, options.timeout) with open(file, "rb") as f: await upload_with_progress_bar(smpclient, f, slot) if slot != 0 or confirm: + if bypass_inspect: + # Read hash from device since we skipped local image inspection + r = await smp_request( + smpclient, options, ImageStatesRead(), "Waiting for image states..." + ) + + if error(r): + print(r) + raise typer.Exit(code=1) + elif success(r): + if len(r.images) == 0: + print("No images on device!") + raise typer.Exit(code=1) + for image in r.images: + if image.slot == slot: + image_hash = image.hash + break + if image_hash is None: + print(f"Image with slot {slot} not found!") + raise typer.Exit(code=1) + else: + assert_never(r) + else: + image_hash = image_tlv_sha256.value + image_states_response = await smp_request( smpclient, options, - ImageStatesWrite(hash=image_tlv_sha256.value, confirm=confirm), + ImageStatesWrite(hash=image_hash, confirm=confirm), "Marking uploaded image for permanent upgrade..." if confirm else "Marking uploaded image for test upgrade...",