Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions src/apm_cli/bundle/unpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import List
from typing import Dict, List

from ..deps.lockfile import LockFile

Expand All @@ -18,6 +18,8 @@ class UnpackResult:
extracted_dir: Path
files: List[str] = field(default_factory=list)
verified: bool = False
dependency_files: Dict[str, List[str]] = field(default_factory=dict)
skipped_count: int = 0


def unpack_bundle(
Expand Down Expand Up @@ -93,18 +95,20 @@ def unpack_bundle(
"apm.lock in the bundle could not be parsed — the bundle may be corrupt."
)

# Collect all deployed_files from lockfile
all_deployed: list[str] = []
for dep in lockfile.get_all_dependencies():
all_deployed.extend(dep.deployed_files)

# Deduplicate
# Collect deployed_files per dependency and deduplicated global list
dep_file_map: Dict[str, List[str]] = {}
seen: set[str] = set()
unique_files: list[str] = []
for f in all_deployed:
if f not in seen:
seen.add(f)
unique_files.append(f)
for dep in lockfile.get_all_dependencies():
dep_key = dep.get_unique_key()
dep_files: list[str] = []
for f in dep.deployed_files:
dep_files.append(f)
if f not in seen:
seen.add(f)
unique_files.append(f)
if dep_files:
dep_file_map[dep_key] = dep_files

# 3. Verify completeness
verified = True
Expand All @@ -128,11 +132,13 @@ def unpack_bundle(
extracted_dir=bundle_path,
files=unique_files,
verified=verified,
dependency_files=dep_file_map,
)

# 4. Copy target files to output_dir (additive, no deletes)
output_dir = Path(output_dir)
output_dir_resolved = output_dir.resolve()
skipped = 0
for rel_path in unique_files:
# Guard against absolute paths or path-traversal entries in deployed_files
p = Path(rel_path)
Expand All @@ -147,6 +153,7 @@ def unpack_bundle(
)
src = source_dir / rel_path
if not src.exists():
skipped += 1
continue # skip_verify may allow missing files
if src.is_dir():
shutil.copytree(src, dest, dirs_exist_ok=True)
Expand All @@ -158,6 +165,8 @@ def unpack_bundle(
extracted_dir=bundle_path,
files=unique_files,
verified=verified,
dependency_files=dep_file_map,
skipped_count=skipped,
)
finally:
# Clean up temp dir if we created one
Expand Down
25 changes: 22 additions & 3 deletions src/apm_cli/commands/pack.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from ..bundle.packer import pack_bundle
from ..bundle.unpacker import unpack_bundle
from ..utils.console import _rich_success, _rich_error, _rich_info, _rich_warning
from ..utils.console import _rich_echo, _rich_success, _rich_error, _rich_info, _rich_warning


@click.command(name="pack", help="Create a self-contained bundle from installed dependencies")
Expand Down Expand Up @@ -82,6 +82,8 @@ def pack_cmd(ctx, fmt, target, archive, output, dry_run):
def unpack_cmd(ctx, bundle_path, output, skip_verify, dry_run):
"""Extract an APM bundle into the project."""
try:
_rich_info(f"Unpacking {bundle_path} → {output}")

result = unpack_bundle(
bundle_path=Path(bundle_path),
output_dir=Path(output),
Expand All @@ -93,18 +95,35 @@ def unpack_cmd(ctx, bundle_path, output, skip_verify, dry_run):
_rich_info("Dry run — no files written")
if result.files:
_rich_info(f"Would unpack {len(result.files)} file(s):")
for f in result.files:
click.echo(f" {f}")
_log_unpack_file_list(result)
else:
_rich_warning("No files in bundle")
return

if not result.files:
_rich_warning("No files were unpacked")
else:
_log_unpack_file_list(result)
if result.skipped_count > 0:
_rich_warning(
f" {result.skipped_count} file(s) skipped (missing from bundle)"
)
verified_msg = " (verified)" if result.verified else ""
_rich_success(f"Unpacked {len(result.files)} file(s){verified_msg}")

except (FileNotFoundError, ValueError) as exc:
_rich_error(str(exc))
sys.exit(1)


def _log_unpack_file_list(result):
"""Log unpacked files grouped by dependency, using tree-style output."""
if result.dependency_files:
for dep_name, dep_files in result.dependency_files.items():
_rich_echo(f" {dep_name}", color="cyan")
for f in dep_files:
_rich_echo(f" └─ {f}", color="white")
else:
# Fallback: flat file list (no dependency info)
for f in result.files:
_rich_echo(f" └─ {f}", color="white")
Loading
Loading