Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 60 additions & 14 deletions smpmgr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from smp.os_management import OS_MGMT_RET_RC
from smpclient.generics import error, error_v1, error_v2, success
from smpclient.mcuboot import IMAGE_TLV, ImageInfo, TLVNotFound
from smpclient.requests.image_management import ImageStatesWrite
from smpclient.requests.image_management import ImageStatesRead, ImageStatesWrite
from smpclient.requests.os_management import ResetWrite
from typing_extensions import Annotated, assert_never

Expand Down Expand Up @@ -169,37 +169,83 @@ def upgrade(
"(or some other mechanism).",
),
] = False,
bypass_inspect: Annotated[
bool,
typer.Option(
"--bypass-inspect",
help="Skip local MCUboot image inspection and read the image hash from the device "
"instead of extracting it from the file. "
"This is useful when uploading images that are not in MCUboot format, such as "
"custom bootloader formats (e.g., NXP's SB3.1) where the hash may be calculated "
"differently (e.g., over a specific block rather than the entire binary). "
"[bold red]WARNING[/bold red]: When using this option, the responsibility for "
"validating image integrity is placed entirely on the device's bootloader. "
"If the bootloader does not verify the image, corrupted firmware could be uploaded "
"and marked as valid. "
"It's assumed the image format encodes some sort of integrity check "
"(e.g., CRC or hash)."
"[bold red]Only use this option if your bootloader performs its own image integrity "
"validation.[/bold red]",
),
] = False,
) -> None:
"""Upload a FW image, mark it for next boot, and reset the device."""

try:
image_info = ImageInfo.load_file(str(file))
logger.info(str(image_info))
except Exception as e:
typer.echo(f"Inspection of FW image failed: {e}")
raise typer.Exit(code=1)
if not bypass_inspect:
try:
image_info = ImageInfo.load_file(str(file))
logger.info(str(image_info))
except Exception as e:
typer.echo(f"Inspection of FW image failed: {e}")
raise typer.Exit(code=1)

try:
image_tlv_sha256 = image_info.get_tlv(IMAGE_TLV.SHA256)
logger.info(f"IMAGE_TLV_SHA256: {image_tlv_sha256}")
except TLVNotFound:
typer.echo("Could not find IMAGE_TLV_SHA256 in image.")
raise typer.Exit(code=1)
try:
image_tlv_sha256 = image_info.get_tlv(IMAGE_TLV.SHA256)
logger.info(f"IMAGE_TLV_SHA256: {image_tlv_sha256}")
except TLVNotFound:
typer.echo("Could not find IMAGE_TLV_SHA256 in image.")
raise typer.Exit(code=1)

options = cast(Options, ctx.obj)
smpclient = get_smpclient(options)

async def f() -> None:
image_hash: bytes | None = None
await connect_with_spinner(smpclient, options.timeout)

with open(file, "rb") as f:
await upload_with_progress_bar(smpclient, f, slot)

if slot != 0 or confirm:
if bypass_inspect:
# Read hash from device since we skipped local image inspection
r = await smp_request(
smpclient, options, ImageStatesRead(), "Waiting for image states..."
)

if error(r):
print(r)
raise typer.Exit(code=1)
elif success(r):
if len(r.images) == 0:
print("No images on device!")
raise typer.Exit(code=1)
for image in r.images:
if image.slot == slot:
image_hash = image.hash
break
if image_hash is None:
print(f"Image with slot {slot} not found!")
raise typer.Exit(code=1)
else:
assert_never(r)
else:
image_hash = image_tlv_sha256.value

image_states_response = await smp_request(
smpclient,
options,
ImageStatesWrite(hash=image_tlv_sha256.value, confirm=confirm),
ImageStatesWrite(hash=image_hash, confirm=confirm),
"Marking uploaded image for permanent upgrade..."
if confirm
else "Marking uploaded image for test upgrade...",
Expand Down