From 4a5d8fe538a2837527b40db3710f6a75e034d96e Mon Sep 17 00:00:00 2001 From: magi-morph Date: Fri, 13 Mar 2026 18:43:40 +0000 Subject: [PATCH 1/3] sdk: add snapshot ttl support --- morphcloud/api.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/morphcloud/api.py b/morphcloud/api.py index bcd42f9..d20165c 100644 --- a/morphcloud/api.py +++ b/morphcloud/api.py @@ -657,6 +657,7 @@ def create( vcpus: typing.Optional[int] = None, memory: typing.Optional[int] = None, disk_size: typing.Optional[int] = None, + ttl_seconds: typing.Optional[int] = None, digest: typing.Optional[str] = None, metadata: typing.Optional[typing.Dict[str, str]] = None, ) -> Snapshot: @@ -667,6 +668,7 @@ def create( vcpus: The number of virtual CPUs for the snapshot. memory: The amount of memory (in MB) for the snapshot. disk_size: The size of the snapshot (in MB). + ttl_seconds: Optional time-to-live in seconds for the snapshot. digest: Optional digest for the snapshot. If provided, it will be used to identify the snapshot. If a snapshot with the same digest already exists, it will be returned instead of creating a new one. metadata: Optional metadata to attach to the snapshot.""" body = {} @@ -678,6 +680,8 @@ def create( body["memory"] = memory if disk_size is not None: body["disk_size"] = disk_size + if ttl_seconds is not None: + body["ttl_seconds"] = ttl_seconds if digest is not None: body["digest"] = digest if metadata is not None: @@ -693,6 +697,7 @@ async def acreate( vcpus: typing.Optional[int] = None, memory: typing.Optional[int] = None, disk_size: typing.Optional[int] = None, + ttl_seconds: typing.Optional[int] = None, digest: typing.Optional[str] = None, metadata: typing.Optional[typing.Dict[str, str]] = None, ) -> Snapshot: @@ -703,6 +708,7 @@ async def acreate( vcpus: The number of virtual CPUs for the snapshot. memory: The amount of memory (in MB) for the snapshot. disk_size: The size of the snapshot (in MB). + ttl_seconds: Optional time-to-live in seconds for the snapshot. digest: Optional digest for the snapshot. If provided, it will be used to identify the snapshot. If a snapshot with the same digest already exists, it will be returned instead of creating a new one. metadata: Optional metadata to attach to the snapshot.""" body = {} @@ -714,6 +720,8 @@ async def acreate( body["memory"] = memory if disk_size is not None: body["disk_size"] = disk_size + if ttl_seconds is not None: + body["ttl_seconds"] = ttl_seconds if digest is not None: body["digest"] = digest if metadata is not None: @@ -749,6 +757,9 @@ class Snapshot(BaseModel): metadata: typing.Dict[str, str] = Field( default_factory=dict, description="User provided metadata" ) + ttl: typing.Optional[TTL] = Field( + default=None, description="Time-To-Live configuration for the snapshot" + ) _api: SnapshotAPI = PrivateAttr() From 966fd7df8b0379919520a717005f5faea2abeb7b Mon Sep 17 00:00:00 2001 From: magi-morph Date: Fri, 20 Mar 2026 20:49:52 +0000 Subject: [PATCH 2/3] Route snapshot touch through TTL update --- morphcloud/api.py | 474 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 328 insertions(+), 146 deletions(-) diff --git a/morphcloud/api.py b/morphcloud/api.py index d20165c..881b520 100644 --- a/morphcloud/api.py +++ b/morphcloud/api.py @@ -11,6 +11,7 @@ import typing from concurrent.futures import ThreadPoolExecutor, as_completed from functools import lru_cache +from urllib.parse import urlparse, urlunparse import httpx from pydantic import BaseModel, Field, PrivateAttr @@ -108,6 +109,42 @@ class TTL(BaseModel): ) +DEFAULT_CHAIN_SNAPSHOT_TTL_SECONDS = 7 * 24 * 60 * 60 + + +def _chain_snapshot_ttl_seconds() -> int: + raw = os.getenv("MORPH_CHAIN_SNAPSHOT_TTL_SECONDS", "").strip() + if raw: + try: + parsed = int(raw) + if parsed > 0: + return parsed + except Exception: + pass + return DEFAULT_CHAIN_SNAPSHOT_TTL_SECONDS + + +def _exec_service_proxy_base_url(base_url: str) -> typing.Optional[str]: + try: + parsed = urlparse(base_url) + except Exception: + return None + host = parsed.hostname or "" + if not host.startswith("api-") or not host.endswith(".http.cloud.morph.so"): + return None + service_host = "service-" + host[len("api-") :] + port = f":{parsed.port}" if parsed.port else "" + netloc = f"{service_host}{port}" + path = parsed.path.rstrip("/") + if not path: + path = "/service/api" + elif path.endswith("/api"): + path = "/service" + path + else: + path = "/service" + (path if path.startswith("/") else f"/{path}") + return urlunparse(parsed._replace(netloc=netloc, path=path)) + + class WakeOn(BaseModel): """Represents the wake-on-event configuration for an instance.""" @@ -796,6 +833,45 @@ async def aset_metadata(self, metadata: typing.Dict[str, str]) -> None: response.raise_for_status() await self._refresh_async() + def set_ttl(self, ttl_seconds: typing.Optional[int]) -> None: + """Set or clear the snapshot TTL.""" + payload = {"ttl_seconds": ttl_seconds} + response = self._api._client._http_client.post( + f"/snapshot/{self.id}/ttl", + json=payload, + ) + response.raise_for_status() + self._refresh() + + async def aset_ttl(self, ttl_seconds: typing.Optional[int]) -> None: + payload = {"ttl_seconds": ttl_seconds} + response = await self._api._client._async_http_client.post( + f"/snapshot/{self.id}/ttl", + json=payload, + ) + response.raise_for_status() + await self._refresh_async() + + def clear_ttl(self) -> None: + """Clear the snapshot TTL (never expire).""" + self.set_ttl(None) + + async def aclear_ttl(self) -> None: + await self.aset_ttl(None) + + def touch(self) -> None: + """Heartbeat the snapshot (extends inactivity-based TTL).""" + ttl_seconds = None + if self.ttl is not None: + ttl_seconds = self.ttl.ttl_seconds + self.set_ttl(ttl_seconds) + + async def atouch(self) -> None: + ttl_seconds = None + if self.ttl is not None: + ttl_seconds = self.ttl.ttl_seconds + await self.aset_ttl(ttl_seconds) + def _refresh(self) -> None: refreshed = self._api.get(self.id) updated = type(self).model_validate(refreshed.model_dump()) @@ -1001,12 +1077,19 @@ def _cache_effect( # 5) Check if there's already a snapshot with that digest candidates = self._api.list(digest=new_chain_hash) if candidates: + cached = candidates[0] + try: + cached.touch() + except Exception as exc: + console.print( + f"[yellow]Warning: failed to touch cached snapshot {cached.id}: {exc}[/yellow]" + ) console.print( f"[bold green]✅ Using cached snapshot[/bold green] " f"with digest [white]{new_chain_hash}[/white] " f"for effect [yellow]{fn.__name__}[/yellow]." ) - return candidates[0] + return cached # 6) Otherwise, apply the effect on a fresh instance from this snapshot console.print( @@ -1018,7 +1101,10 @@ def _cache_effect( instance.wait_until_ready(timeout=300) fn(instance, *args, **kwargs) # Actually run the effect # 7) Snapshot the instance, passing digest=new_chain_hash to store the chain hash - new_snapshot = instance.snapshot(digest=new_chain_hash) + new_snapshot = instance.snapshot( + digest=new_chain_hash, + ttl_seconds=_chain_snapshot_ttl_seconds(), + ) finally: instance.stop() @@ -1348,6 +1434,12 @@ def _scan_cached_prefix_with_digests( if candidates: cached_len = i + 1 last_snap = candidates[0] + try: + last_snap.touch() + except Exception as exc: + console.print( + f"[yellow]Warning: failed to touch cached snapshot {last_snap.id}: {exc}[/yellow]" + ) else: break @@ -1429,7 +1521,10 @@ def build( f"[magenta]• Snapshotting step {idx + 1} with digest " f"[white]{desired_digest}[/white] ...[/magenta]" ) - current_snapshot = instance.snapshot(digest=desired_digest) + current_snapshot = instance.snapshot( + digest=desired_digest, + ttl_seconds=_chain_snapshot_ttl_seconds(), + ) assert current_snapshot is not None console.print( @@ -1477,6 +1572,12 @@ async def abuild( if candidates: cached_len = i + 1 last_cached = candidates[0] + try: + await last_cached.atouch() + except Exception as exc: + console.print( + f"[yellow]Warning: failed to touch cached snapshot {last_cached.id}: {exc}[/yellow]" + ) else: break @@ -1531,7 +1632,10 @@ async def abuild( f"[magenta]• Snapshotting step {idx + 1} with digest " f"[white]{desired_digest}[/white] ...[/magenta]" ) - current_snapshot = await instance.asnapshot(digest=desired_digest) + current_snapshot = await instance.asnapshot( + digest=desired_digest, + ttl_seconds=_chain_snapshot_ttl_seconds(), + ) assert current_snapshot is not None console.print( @@ -2794,13 +2898,17 @@ def snapshot( self, digest: typing.Optional[str] = None, metadata: typing.Optional[typing.Dict[str, str]] = None, + ttl_seconds: typing.Optional[int] = None, ) -> Snapshot: """Save the instance as a snapshot.""" params = {} if digest is not None: params["digest"] = digest + body: typing.Dict[str, typing.Any] = {"metadata": metadata} + if ttl_seconds is not None: + body["ttl_seconds"] = ttl_seconds response = self._api._client._http_client.post( - f"/instance/{self.id}/snapshot", params=params, json=dict(metadata=metadata) + f"/instance/{self.id}/snapshot", params=params, json=body ) return Snapshot.model_validate(response.json())._set_api( self._api._client.snapshots, @@ -2810,13 +2918,17 @@ async def asnapshot( self, digest: typing.Optional[str] = None, metadata: typing.Optional[typing.Dict[str, str]] = None, + ttl_seconds: typing.Optional[int] = None, ) -> Snapshot: """Save the instance as a snapshot.""" params = {} if digest is not None: params = {"digest": digest} + body: typing.Dict[str, typing.Any] = {"metadata": metadata} + if ttl_seconds is not None: + body["ttl_seconds"] = ttl_seconds response = await self._api._client._async_http_client.post( - f"/instance/{self.id}/snapshot", params=params, json=dict(metadata=metadata) + f"/instance/{self.id}/snapshot", params=params, json=body ) return Snapshot.model_validate(response.json())._set_api( self._api._client.snapshots @@ -3058,6 +3170,22 @@ def exec( timeout=timeout, ) return InstanceExecResponse.model_validate(response.json()) + except ApiError as e: + alt_base = _exec_service_proxy_base_url( + self._api._client.base_url + ) + if ( + e.status_code == 401 + and alt_base + and alt_base != self._api._client.base_url + ): + response = self._api._client._http_client.post( + f"{alt_base.rstrip('/')}/instance/{self.id}/exec", + json={"command": command}, + timeout=timeout, + ) + return InstanceExecResponse.model_validate(response.json()) + raise except Exception as e: # Convert HTTP timeout errors to more user-friendly TimeoutError if isinstance(e, (httpx.ReadTimeout, httpx.TimeoutException)): @@ -3092,82 +3220,101 @@ def _exec_streaming( "Content-Type": "application/json", } - # Accumulate output for final response - stdout_chunks = [] - stderr_chunks = [] - exit_code = 0 + def _run_stream(url: str) -> InstanceExecResponse: + # Accumulate output for final response + stdout_chunks: typing.List[str] = [] + stderr_chunks: typing.List[str] = [] + exit_code = 0 - # Make streaming request - try: - with self._api._client._http_client.stream( - "POST", - f"{self._api._client.base_url}/instance/{self.id}/exec/sse", - json={"command": command}, - headers=headers, - timeout=timeout, - ) as response: - response.raise_for_status() - - for line in response.iter_lines(): - if not line.strip(): - continue + # Make streaming request + try: + with self._api._client._http_client.stream( + "POST", + url, + json={"command": command}, + headers=headers, + timeout=timeout, + ) as response: + response.raise_for_status() - # Skip lines that don't start with 'data: ' - if not line.startswith("data: "): - continue + for line in response.iter_lines(): + if not line.strip(): + continue - data_content = line[6:] # Remove 'data: ' prefix - - # Check for stream end - if data_content.strip() == "[DONE]": - break - - try: - event = json.loads(data_content) - event_type = event.get("type") - content = event.get("content", "") - - if event_type == "stdout": - stdout_chunks.append(content) - if on_stdout: - try: - on_stdout(content) - except Exception: - # Log callback errors but don't interrupt stream - pass - - elif event_type == "stderr": - stderr_chunks.append(content) - if on_stderr: - try: - on_stderr(content) - except Exception: - # Log callback errors but don't interrupt stream - pass - - elif event_type == "exit_code": - exit_code = int(content) - - except (json.JSONDecodeError, KeyError, ValueError): - # Skip malformed events and continue processing - continue - except Exception as e: - # Convert HTTP timeout errors to more user-friendly TimeoutError - if isinstance(e, (httpx.ReadTimeout, httpx.TimeoutException)): - raise TimeoutError( - f"Command execution timed out after {timeout} seconds" - ) from e - # Re-raise other exceptions as-is - raise + # Skip lines that don't start with 'data: ' + if not line.startswith("data: "): + continue - return InstanceExecResponse.model_validate( - { - "exit_code": exit_code, - "stdout": "".join(stdout_chunks), - "stderr": "".join(stderr_chunks), - } + data_content = line[6:] # Remove 'data: ' prefix + + # Check for stream end + if data_content.strip() == "[DONE]": + break + + try: + event = json.loads(data_content) + event_type = event.get("type") + content = event.get("content", "") + + if event_type == "stdout": + stdout_chunks.append(content) + if on_stdout: + try: + on_stdout(content) + except Exception: + # Log callback errors but don't interrupt stream + pass + + elif event_type == "stderr": + stderr_chunks.append(content) + if on_stderr: + try: + on_stderr(content) + except Exception: + # Log callback errors but don't interrupt stream + pass + + elif event_type == "exit_code": + exit_code = int(content) + + except (json.JSONDecodeError, KeyError, ValueError): + # Skip malformed events and continue processing + continue + except Exception as e: + # Convert HTTP timeout errors to more user-friendly TimeoutError + if isinstance(e, (httpx.ReadTimeout, httpx.TimeoutException)): + raise TimeoutError( + f"Command execution timed out after {timeout} seconds" + ) from e + # Re-raise other exceptions as-is + raise + + return InstanceExecResponse.model_validate( + { + "exit_code": exit_code, + "stdout": "".join(stdout_chunks), + "stderr": "".join(stderr_chunks), + } + ) + + primary_url = f"{self._api._client.base_url}/instance/{self.id}/exec/sse" + alt_base = _exec_service_proxy_base_url(self._api._client.base_url) + alt_url = ( + f"{alt_base.rstrip('/')}/instance/{self.id}/exec/sse" if alt_base else None ) + try: + return _run_stream(primary_url) + except httpx.HTTPStatusError as e: + if ( + e.response is not None + and e.response.status_code == 401 + and alt_url + and alt_url != primary_url + ): + return _run_stream(alt_url) + raise + async def aexec( self, command: typing.Union[str, typing.List[str]], @@ -3219,6 +3366,22 @@ async def aexec( timeout=timeout, ) return InstanceExecResponse.model_validate(response.json()) + except ApiError as e: + alt_base = _exec_service_proxy_base_url( + self._api._client.base_url + ) + if ( + e.status_code == 401 + and alt_base + and alt_base != self._api._client.base_url + ): + response = await self._api._client._async_http_client.post( + f"{alt_base.rstrip('/')}/instance/{self.id}/exec", + json={"command": command}, + timeout=timeout, + ) + return InstanceExecResponse.model_validate(response.json()) + raise except Exception as e: # Convert HTTP timeout errors to more user-friendly TimeoutError if isinstance(e, (httpx.ReadTimeout, httpx.TimeoutException)): @@ -3253,82 +3416,101 @@ async def _aexec_streaming( "Content-Type": "application/json", } - # Accumulate output for final response - stdout_chunks = [] - stderr_chunks = [] - exit_code = 0 + async def _run_stream(url: str) -> InstanceExecResponse: + # Accumulate output for final response + stdout_chunks: typing.List[str] = [] + stderr_chunks: typing.List[str] = [] + exit_code = 0 - # Make streaming request - try: - async with self._api._client._async_http_client.stream( - "POST", - f"/instance/{self.id}/exec/sse", - json={"command": command}, - headers=headers, - timeout=timeout, - ) as response: - response.raise_for_status() - - async for line in response.aiter_lines(): - if not line.strip(): - continue + # Make streaming request + try: + async with self._api._client._async_http_client.stream( + "POST", + url, + json={"command": command}, + headers=headers, + timeout=timeout, + ) as response: + response.raise_for_status() - # Skip lines that don't start with 'data: ' - if not line.startswith("data: "): - continue + async for line in response.aiter_lines(): + if not line.strip(): + continue - data_content = line[6:] # Remove 'data: ' prefix - - # Check for stream end - if data_content.strip() == "[DONE]": - break - - try: - event = json.loads(data_content) - event_type = event.get("type") - content = event.get("content", "") - - if event_type == "stdout": - stdout_chunks.append(content) - if on_stdout: - try: - on_stdout(content) - except Exception: - # Log callback errors but don't interrupt stream - pass - - elif event_type == "stderr": - stderr_chunks.append(content) - if on_stderr: - try: - on_stderr(content) - except Exception: - # Log callback errors but don't interrupt stream - pass - - elif event_type == "exit_code": - exit_code = int(content) - - except (json.JSONDecodeError, KeyError, ValueError): - # Skip malformed events and continue processing - continue - except Exception as e: - # Convert HTTP timeout errors to more user-friendly TimeoutError - if isinstance(e, (httpx.ReadTimeout, httpx.TimeoutException)): - raise TimeoutError( - f"Command execution timed out after {timeout} seconds" - ) from e - # Re-raise other exceptions as-is - raise + # Skip lines that don't start with 'data: ' + if not line.startswith("data: "): + continue - return InstanceExecResponse.model_validate( - { - "exit_code": exit_code, - "stdout": "".join(stdout_chunks), - "stderr": "".join(stderr_chunks), - } + data_content = line[6:] # Remove 'data: ' prefix + + # Check for stream end + if data_content.strip() == "[DONE]": + break + + try: + event = json.loads(data_content) + event_type = event.get("type") + content = event.get("content", "") + + if event_type == "stdout": + stdout_chunks.append(content) + if on_stdout: + try: + on_stdout(content) + except Exception: + # Log callback errors but don't interrupt stream + pass + + elif event_type == "stderr": + stderr_chunks.append(content) + if on_stderr: + try: + on_stderr(content) + except Exception: + # Log callback errors but don't interrupt stream + pass + + elif event_type == "exit_code": + exit_code = int(content) + + except (json.JSONDecodeError, KeyError, ValueError): + # Skip malformed events and continue processing + continue + except Exception as e: + # Convert HTTP timeout errors to more user-friendly TimeoutError + if isinstance(e, (httpx.ReadTimeout, httpx.TimeoutException)): + raise TimeoutError( + f"Command execution timed out after {timeout} seconds" + ) from e + # Re-raise other exceptions as-is + raise + + return InstanceExecResponse.model_validate( + { + "exit_code": exit_code, + "stdout": "".join(stdout_chunks), + "stderr": "".join(stderr_chunks), + } + ) + + primary_url = f"{self._api._client.base_url}/instance/{self.id}/exec/sse" + alt_base = _exec_service_proxy_base_url(self._api._client.base_url) + alt_url = ( + f"{alt_base.rstrip('/')}/instance/{self.id}/exec/sse" if alt_base else None ) + try: + return await _run_stream(primary_url) + except httpx.HTTPStatusError as e: + if ( + e.response is not None + and e.response.status_code == 401 + and alt_url + and alt_url != primary_url + ): + return await _run_stream(alt_url) + raise + def wait_until_ready(self, timeout: typing.Optional[float] = None) -> None: """Wait until the instance is ready.""" start_time = time.time() From 99b6c2f6ac2e1862ca7c6dec1e42af665ea7f6cd Mon Sep 17 00:00:00 2001 From: magi-morph Date: Fri, 20 Mar 2026 21:06:00 +0000 Subject: [PATCH 3/3] Add chain TTL defaults to experimental snapshots --- morphcloud/experimental/__init__.py | 97 +++++++++++++---------------- 1 file changed, 42 insertions(+), 55 deletions(-) diff --git a/morphcloud/experimental/__init__.py b/morphcloud/experimental/__init__.py index 0b572c4..38d8b6b 100644 --- a/morphcloud/experimental/__init__.py +++ b/morphcloud/experimental/__init__.py @@ -2,6 +2,7 @@ import hashlib import logging +import os import stat import threading import time @@ -9,7 +10,6 @@ from collections import deque from contextlib import contextmanager from pathlib import Path -from typing import Iterator, Literal, Tuple, Union import paramiko @@ -19,6 +19,20 @@ # Configure logging for the experimental module logger = logging.getLogger(__name__) +DEFAULT_CHAIN_SNAPSHOT_TTL_SECONDS = 7 * 24 * 60 * 60 + + +def _chain_snapshot_ttl_seconds() -> int: + raw = os.getenv("MORPH_CHAIN_SNAPSHOT_TTL_SECONDS", "").strip() + if raw: + try: + parsed = int(raw) + if parsed > 0: + return parsed + except Exception: + pass + return DEFAULT_CHAIN_SNAPSHOT_TTL_SECONDS + # ──────────────────────────── Logging System ─────────────────────────── # class LoggingSystem: @@ -137,64 +151,18 @@ def panel(self) -> str: # ───────────────────── Anthropic / agent setup ─────────────────── # -StreamTuple = Union[ - Tuple[Literal["stdout"], str], - Tuple[Literal["stdin"], str], - Tuple[Literal["exit_code"], int], -] - - -def ssh_stream( - ssh: paramiko.SSHClient, - command: str, - *, - encoding: str = "utf-8", - chunk_size: int = 4096, - poll: float = 0.01, -) -> Iterator[StreamTuple]: - transport = ssh.get_transport() - assert transport is not None, "SSH transport must be connected" - chan = transport.open_session() - chan.exec_command(command) - - while True: - while chan.recv_ready(): - data = chan.recv(chunk_size) - if data: - yield ("stdout", data.decode(encoding, errors="replace")) - while chan.recv_stderr_ready(): - data = chan.recv_stderr(chunk_size) - if data: - yield ("stderr", data.decode(encoding, errors="replace")) - if ( - chan.exit_status_ready() - and not chan.recv_ready() - and not chan.recv_stderr_ready() - ): - break - time.sleep(poll) - - yield ("exit_code", chan.recv_exit_status()) - chan.close() - - def instance_exec( instance, command: str, on_stdout: typing.Callable[[str], None], on_stderr: typing.Callable[[str], None], ) -> int: - with instance.ssh() as ssh: - ssh_client = ssh._client # type: ignore[attr-defined] - for msg in ssh_stream(ssh_client, command): - match msg: - case ("stdout", txt): - on_stdout(txt) - case ("stderr", txt): - on_stderr(txt) - case ("exit_code", code): - return code - raise RuntimeError("SSH stream did not yield exit code.") + response = instance.exec( + command, + on_stdout=on_stdout, + on_stderr=on_stderr, + ) + return response.exit_code client = MorphCloudClient() @@ -222,6 +190,7 @@ def create( memory: int = 4096, disk_size: int = 8192, invalidate: InvalidateFn | bool = False, + ttl_seconds: typing.Optional[int] = None, ) -> "Snapshot": logger.info( "🖼 Snapshot.create()", @@ -244,6 +213,10 @@ def create( if invalidate_fn(Snapshot(s)): s.delete() digest = f"{name}-{image_id}-{vcpus}-{memory}-{disk_size}" + if ttl_seconds is None: + ttl_seconds = _chain_snapshot_ttl_seconds() + if ttl_seconds is not None and ttl_seconds <= 0: + raise ValueError("ttl_seconds must be greater than zero") snap = client.snapshots.create( image_id=image_id, vcpus=vcpus, @@ -251,6 +224,7 @@ def create( disk_size=disk_size, digest=digest, metadata={"name": name}, + ttl_seconds=ttl_seconds, ) return cls(snap) @@ -342,6 +316,7 @@ def apply( if isinstance(invalidate, typing.Callable) else lambda _: invalidate ) + chain_ttl = _chain_snapshot_ttl_seconds() if key: digest = self.key_to_digest(key) snaps = client.snapshots.list(digest=digest) @@ -354,6 +329,12 @@ def apply( valid.append(s) snaps = valid if snaps: + try: + snaps[0].touch() + except Exception as exc: + logger.warning( + "Failed to touch cached snapshot", extra={"error": str(exc)} + ) return Snapshot(snaps[0]) if start_fn is None: @@ -370,7 +351,10 @@ def apply( inst = inst if res is None else res new_snapshot = Snapshot( - inst.snapshot(digest=self.key_to_digest(key) if key else None) + inst.snapshot( + digest=self.key_to_digest(key) if key else None, + ttl_seconds=chain_ttl, + ) ) logger.info( @@ -391,7 +375,10 @@ def apply( res = func(inst) inst = inst if res is None else res return Snapshot( - inst.snapshot(digest=self.key_to_digest(key) if key else None) + inst.snapshot( + digest=self.key_to_digest(key) if key else None, + ttl_seconds=chain_ttl, + ) ) # -------------- run with stream between CMD/RET -------------- #