diff --git a/openviking/client/local.py b/openviking/client/local.py index a118c2735..2b13625be 100644 --- a/openviking/client/local.py +++ b/openviking/client/local.py @@ -403,7 +403,7 @@ async def commit_session( async def get_task(self, task_id: str) -> Optional[Dict[str, Any]]: """Query background task status.""" - return await self._service.sessions.get_commit_task(task_id) + return await self._service.sessions.get_commit_task(task_id, self._ctx) async def add_message( self, diff --git a/openviking/server/routers/content.py b/openviking/server/routers/content.py index 7f2931595..7801546d0 100644 --- a/openviking/server/routers/content.py +++ b/openviking/server/routers/content.py @@ -178,7 +178,12 @@ async def reindex( if request.wait: # Synchronous path: block until reindex completes - if tracker.has_running(REINDEX_TASK_TYPE, uri): + if tracker.has_running( + REINDEX_TASK_TYPE, + uri, + owner_account_id=_ctx.account_id, + owner_user_id=_ctx.user.user_id, + ): return Response( status="error", error=ErrorInfo( @@ -190,7 +195,12 @@ async def reindex( return Response(status="ok", result=result) else: # Async path: run in background, return task_id for polling - task = tracker.create_if_no_running(REINDEX_TASK_TYPE, uri) + task = tracker.create_if_no_running( + REINDEX_TASK_TYPE, + uri, + owner_account_id=_ctx.account_id, + owner_user_id=_ctx.user.user_id, + ) if task is None: return Response( status="error", diff --git a/openviking/server/routers/tasks.py b/openviking/server/routers/tasks.py index 2c2222239..223b868b6 100644 --- a/openviking/server/routers/tasks.py +++ b/openviking/server/routers/tasks.py @@ -9,8 +9,10 @@ from typing import Optional -from fastapi import APIRouter, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query +from openviking.server.auth import get_request_context +from openviking.server.identity import RequestContext from openviking.server.models import Response from openviking.service.task_tracker import get_task_tracker @@ -18,10 +20,17 @@ @router.get("/tasks/{task_id}") -async def get_task(task_id: str): +async def get_task( + task_id: str, + _ctx: RequestContext = Depends(get_request_context), +): """Get the status of a single background task.""" tracker = get_task_tracker() - task = tracker.get(task_id) + task = tracker.get( + task_id, + owner_account_id=_ctx.account_id, + owner_user_id=_ctx.user.user_id, + ) if not task: raise HTTPException(status_code=404, detail="Task not found or expired") return Response(status="ok", result=task.to_dict()) @@ -35,6 +44,7 @@ async def list_tasks( ), resource_id: Optional[str] = Query(None, description="Filter by resource ID (e.g. session_id)"), limit: int = Query(50, le=200, description="Max results"), + _ctx: RequestContext = Depends(get_request_context), ): """List background tasks with optional filters.""" tracker = get_task_tracker() @@ -43,5 +53,7 @@ async def list_tasks( status=status, resource_id=resource_id, limit=limit, + owner_account_id=_ctx.account_id, + owner_user_id=_ctx.user.user_id, ) return Response(status="ok", result=[t.to_dict() for t in tasks]) diff --git a/openviking/service/session_service.py b/openviking/service/session_service.py index d6272675e..926827256 100644 --- a/openviking/service/session_service.py +++ b/openviking/service/session_service.py @@ -170,9 +170,13 @@ async def commit_async(self, session_id: str, ctx: RequestContext) -> Dict[str, session = await self.get(session_id, ctx) return await session.commit_async() - async def get_commit_task(self, task_id: str) -> Optional[Dict[str, Any]]: - """Query background commit task status by task_id.""" - task = get_task_tracker().get(task_id) + async def get_commit_task(self, task_id: str, ctx: RequestContext) -> Optional[Dict[str, Any]]: + """Query background commit task status by task_id for the calling owner.""" + task = get_task_tracker().get( + task_id, + owner_account_id=ctx.account_id, + owner_user_id=ctx.user.user_id, + ) return task.to_dict() if task else None async def extract(self, session_id: str, ctx: RequestContext) -> List[Any]: diff --git a/openviking/service/task_tracker.py b/openviking/service/task_tracker.py index f7cd437e1..582f0d9f0 100644 --- a/openviking/service/task_tracker.py +++ b/openviking/service/task_tracker.py @@ -48,6 +48,8 @@ class TaskRecord: created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time) resource_id: Optional[str] = None # e.g. session_id + owner_account_id: Optional[str] = None + owner_user_id: Optional[str] = None result: Optional[Dict[str, Any]] = None error: Optional[str] = None @@ -55,6 +57,8 @@ def to_dict(self) -> Dict[str, Any]: """Serialize for JSON response.""" d = asdict(self) d["status"] = self.status.value + d.pop("owner_account_id", None) + d.pop("owner_user_id", None) return d @@ -170,14 +174,43 @@ def _evict_expired(self) -> None: if to_delete: logger.debug("[TaskTracker] Evicted %d expired tasks", len(to_delete)) + @staticmethod + def _matches_owner( + task: TaskRecord, + owner_account_id: Optional[str] = None, + owner_user_id: Optional[str] = None, + ) -> bool: + """Return True when a task belongs to the requested owner filter.""" + if owner_account_id is not None and task.owner_account_id != owner_account_id: + return False + if owner_user_id is not None and task.owner_user_id != owner_user_id: + return False + return True + + @staticmethod + def _validate_owner(owner_account_id: str, owner_user_id: str) -> None: + """Reject ownerless task creation for user-originated background work.""" + if not owner_account_id or not owner_user_id: + raise ValueError("Task ownership requires non-empty owner_account_id and owner_user_id") + # ── CRUD ── - def create(self, task_type: str, resource_id: Optional[str] = None) -> TaskRecord: + def create( + self, + task_type: str, + resource_id: Optional[str] = None, + *, + owner_account_id: str, + owner_user_id: str, + ) -> TaskRecord: """Register a new pending task. Returns a snapshot copy.""" + self._validate_owner(owner_account_id, owner_user_id) task = TaskRecord( task_id=str(uuid4()), task_type=task_type, resource_id=resource_id, + owner_account_id=owner_account_id, + owner_user_id=owner_user_id, ) with self._lock: self._tasks[task.task_id] = task @@ -189,17 +222,26 @@ def create(self, task_type: str, resource_id: Optional[str] = None) -> TaskRecor ) return self._copy(task) - def create_if_no_running(self, task_type: str, resource_id: str) -> Optional[TaskRecord]: + def create_if_no_running( + self, + task_type: str, + resource_id: str, + *, + owner_account_id: str, + owner_user_id: str, + ) -> Optional[TaskRecord]: """Atomically check for running tasks and create a new one if none exist. Returns TaskRecord on success, None if a running task already exists. This eliminates the race condition between has_running() and create(). """ + self._validate_owner(owner_account_id, owner_user_id) with self._lock: # Check for existing running tasks has_active = any( t.task_type == task_type and t.resource_id == resource_id + and self._matches_owner(t, owner_account_id, owner_user_id) and t.status in (TaskStatus.PENDING, TaskStatus.RUNNING) for t in self._tasks.values() ) @@ -210,6 +252,8 @@ def create_if_no_running(self, task_type: str, resource_id: str) -> Optional[Tas task_id=str(uuid4()), task_type=task_type, resource_id=resource_id, + owner_account_id=owner_account_id, + owner_user_id=owner_user_id, ) self._tasks[task.task_id] = task logger.debug( @@ -248,11 +292,18 @@ def fail(self, task_id: str, error: str) -> None: task.updated_at = time.time() logger.warning("[TaskTracker] Task %s failed: %s", task_id, _sanitize_error(error)) - def get(self, task_id: str) -> Optional[TaskRecord]: + def get( + self, + task_id: str, + owner_account_id: Optional[str] = None, + owner_user_id: Optional[str] = None, + ) -> Optional[TaskRecord]: """Look up a single task. Returns a snapshot copy (None if not found).""" with self._lock: task = self._tasks.get(task_id) - return self._copy(task) if task else None + if task is None or not self._matches_owner(task, owner_account_id, owner_user_id): + return None + return self._copy(task) def list_tasks( self, @@ -260,10 +311,16 @@ def list_tasks( status: Optional[str] = None, resource_id: Optional[str] = None, limit: int = 50, + owner_account_id: Optional[str] = None, + owner_user_id: Optional[str] = None, ) -> List[TaskRecord]: """List tasks with optional filters. Most-recent first. Returns snapshot copies.""" with self._lock: - tasks = [self._copy(t) for t in self._tasks.values()] + tasks = [ + self._copy(t) + for t in self._tasks.values() + if self._matches_owner(t, owner_account_id, owner_user_id) + ] if task_type: tasks = [t for t in tasks if t.task_type == task_type] if status: @@ -273,12 +330,19 @@ def list_tasks( tasks.sort(key=lambda t: t.created_at, reverse=True) return tasks[:limit] - def has_running(self, task_type: str, resource_id: str) -> bool: + def has_running( + self, + task_type: str, + resource_id: str, + owner_account_id: Optional[str] = None, + owner_user_id: Optional[str] = None, + ) -> bool: """Check if there is already a running task for the given type+resource.""" with self._lock: return any( t.task_type == task_type and t.resource_id == resource_id + and self._matches_owner(t, owner_account_id, owner_user_id) and t.status in (TaskStatus.PENDING, TaskStatus.RUNNING) for t in self._tasks.values() ) diff --git a/openviking/session/session.py b/openviking/session/session.py index 6c3ca03a4..d65a37a54 100644 --- a/openviking/session/session.py +++ b/openviking/session/session.py @@ -441,7 +441,12 @@ async def commit_async(self) -> Dict[str, Any]: # Create TaskRecord for tracking Phase 2 tracker = get_task_tracker() - task = tracker.create("session_commit", resource_id=self.session_id) + task = tracker.create( + "session_commit", + resource_id=self.session_id, + owner_account_id=self.ctx.account_id, + owner_user_id=self.ctx.user.user_id, + ) asyncio.create_task( self._run_memory_extraction( diff --git a/tests/server/test_api_content.py b/tests/server/test_api_content.py index f9a958088..babdcc7f0 100644 --- a/tests/server/test_api_content.py +++ b/tests/server/test_api_content.py @@ -105,7 +105,7 @@ async def exists(self, uri, ctx=None): return True class FakeTracker: - def has_running(self, task_type, uri): + def has_running(self, task_type, uri, owner_account_id=None, owner_user_id=None): return False async def fake_do_reindex(service, uri, regenerate, ctx): diff --git a/tests/server/test_auth.py b/tests/server/test_auth.py index 0da9d42ea..dd36b1a90 100644 --- a/tests/server/test_auth.py +++ b/tests/server/test_auth.py @@ -23,6 +23,7 @@ from openviking.server.identity import ResolvedIdentity, Role from openviking.server.models import ERROR_CODE_TO_HTTP_STATUS, ErrorInfo, Response from openviking.service.core import OpenVikingService +from openviking.service.task_tracker import get_task_tracker, reset_task_tracker from openviking_cli.exceptions import InvalidArgumentError, OpenVikingError from openviking_cli.session.user_id import UserIdentifier @@ -128,6 +129,15 @@ async def debug_vector_scroll(ctx=Depends(get_request_context)): return app +def _build_task_http_test_app(identity: ResolvedIdentity | None) -> FastAPI: + """Build a lightweight app that mounts the real task router.""" + from openviking.server.routers import tasks as tasks_router + + app = _build_auth_http_test_app(identity=identity, auth_enabled=True, root_api_key=ROOT_KEY) + app.include_router(tasks_router.router) + return app + + @pytest_asyncio.fixture(scope="function") async def auth_service(temp_dir): """Service for auth tests.""" @@ -276,6 +286,69 @@ async def test_auth_on_multiple_endpoints(auth_client: httpx.AsyncClient): assert tenant_resp.status_code == 200 +async def test_task_endpoints_require_auth(): + """Task endpoints must reject unauthenticated callers before lookup/filtering.""" + reset_task_tracker() + app = _build_task_http_test_app(identity=None) + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: + for url in ("/api/v1/tasks", "/api/v1/tasks/nonexistent-id"): + resp = await client.get(url) + assert resp.status_code == 401 + reset_task_tracker() + + +async def test_task_endpoints_are_user_scoped(): + """Authenticated callers must not see another user's background tasks.""" + reset_task_tracker() + account_id = _uid() + tracker = get_task_tracker() + alice_task = tracker.create( + "session_commit", + resource_id="alice-session", + owner_account_id=account_id, + owner_user_id="alice", + ) + bob_task = tracker.create( + "session_commit", + resource_id="bob-session", + owner_account_id=account_id, + owner_user_id="bob", + ) + + alice_app = _build_task_http_test_app( + ResolvedIdentity(role=Role.ADMIN, account_id=account_id, user_id="alice") + ) + bob_app = _build_task_http_test_app( + ResolvedIdentity(role=Role.ADMIN, account_id=account_id, user_id="bob") + ) + alice_transport = httpx.ASGITransport(app=alice_app) + bob_transport = httpx.ASGITransport(app=bob_app) + + async with httpx.AsyncClient( + transport=alice_transport, base_url="http://testserver" + ) as alice_client: + alice_get = await alice_client.get(f"/api/v1/tasks/{alice_task.task_id}") + assert alice_get.status_code == 200 + assert alice_get.json()["result"]["resource_id"] == "alice-session" + + alice_list = await alice_client.get("/api/v1/tasks") + assert alice_list.status_code == 200 + assert {task["task_id"] for task in alice_list.json()["result"]} == {alice_task.task_id} + + async with httpx.AsyncClient( + transport=bob_transport, base_url="http://testserver" + ) as bob_client: + bob_get_other = await bob_client.get(f"/api/v1/tasks/{alice_task.task_id}") + assert bob_get_other.status_code == 404 + + bob_list = await bob_client.get("/api/v1/tasks") + assert bob_list.status_code == 200 + assert {task["task_id"] for task in bob_list.json()["result"]} == {bob_task.task_id} + + reset_task_tracker() + + # ---- Role-based access tests ---- diff --git a/tests/test_session_task_tracking.py b/tests/test_session_task_tracking.py index 455db644c..ea1aa5bd1 100644 --- a/tests/test_session_task_tracking.py +++ b/tests/test_session_task_tracking.py @@ -66,7 +66,12 @@ def _make_tracked_commit(behavior="instant", result_overrides=None, gate=None, s async def mock_commit(_sid, _ctx): tracker = get_task_tracker() - task = tracker.create("session_commit", resource_id=_sid) + task = tracker.create( + "session_commit", + resource_id=_sid, + owner_account_id=_ctx.account_id, + owner_user_id=_ctx.user.user_id, + ) archive_uri = f"viking://session/test/{_sid}/history/archive_001" async def _background(): diff --git a/tests/test_task_tracker.py b/tests/test_task_tracker.py index 0cbe972f3..87fd9de2f 100644 --- a/tests/test_task_tracker.py +++ b/tests/test_task_tracker.py @@ -7,6 +7,8 @@ import pytest +from openviking.server.identity import RequestContext, Role +from openviking.service.session_service import SessionService from openviking.service.task_tracker import ( TaskStatus, TaskTracker, @@ -14,6 +16,7 @@ get_task_tracker, reset_task_tracker, ) +from openviking_cli.session.user_id import UserIdentifier @pytest.fixture(autouse=True) @@ -29,11 +32,25 @@ def tracker() -> TaskTracker: return TaskTracker() +def _owner_kwargs(account_id: str = "acme", user_id: str = "alice"): + return { + "owner_account_id": account_id, + "owner_user_id": user_id, + } + + +def _make_ctx(account_id: str = "acme", user_id: str = "alice") -> RequestContext: + return RequestContext( + user=UserIdentifier(account_id, user_id, "agent-1"), + role=Role.ADMIN, + ) + + # ── Basic CRUD ── def test_create_task(tracker: TaskTracker): - task = tracker.create("session_commit", resource_id="sess-123") + task = tracker.create("session_commit", resource_id="sess-123", **_owner_kwargs()) assert task.task_id assert task.task_type == "session_commit" assert task.resource_id == "sess-123" @@ -41,7 +58,7 @@ def test_create_task(tracker: TaskTracker): def test_start_task(tracker: TaskTracker): - task = tracker.create("session_commit") + task = tracker.create("session_commit", **_owner_kwargs()) tracker.start(task.task_id) retrieved = tracker.get(task.task_id) assert retrieved is not None @@ -49,7 +66,7 @@ def test_start_task(tracker: TaskTracker): def test_complete_task(tracker: TaskTracker): - task = tracker.create("session_commit", resource_id="s1") + task = tracker.create("session_commit", resource_id="s1", **_owner_kwargs()) tracker.start(task.task_id) tracker.complete(task.task_id, {"memories_extracted": 3}) retrieved = tracker.get(task.task_id) @@ -59,7 +76,7 @@ def test_complete_task(tracker: TaskTracker): def test_fail_task(tracker: TaskTracker): - task = tracker.create("session_commit") + task = tracker.create("session_commit", **_owner_kwargs()) tracker.start(task.task_id) tracker.fail(task.task_id, "LLM timeout") retrieved = tracker.get(task.task_id) @@ -76,23 +93,23 @@ def test_get_nonexistent_returns_none(tracker: TaskTracker): def test_list_all(tracker: TaskTracker): - tracker.create("session_commit", resource_id="s1") - tracker.create("resource_ingest", resource_id="r1") + tracker.create("session_commit", resource_id="s1", **_owner_kwargs()) + tracker.create("resource_ingest", resource_id="r1", **_owner_kwargs()) tasks = tracker.list_tasks() assert len(tasks) == 2 def test_list_filter_by_type(tracker: TaskTracker): - tracker.create("session_commit") - tracker.create("resource_ingest") + tracker.create("session_commit", **_owner_kwargs()) + tracker.create("resource_ingest", **_owner_kwargs()) tasks = tracker.list_tasks(task_type="session_commit") assert len(tasks) == 1 assert tasks[0].task_type == "session_commit" def test_list_filter_by_status(tracker: TaskTracker): - t1 = tracker.create("session_commit") - tracker.create("session_commit") + t1 = tracker.create("session_commit", **_owner_kwargs()) + tracker.create("session_commit", **_owner_kwargs()) tracker.start(t1.task_id) tracker.complete(t1.task_id, {}) @@ -103,23 +120,61 @@ def test_list_filter_by_status(tracker: TaskTracker): def test_list_filter_by_resource_id(tracker: TaskTracker): - tracker.create("session_commit", resource_id="s1") - tracker.create("session_commit", resource_id="s2") + tracker.create("session_commit", resource_id="s1", **_owner_kwargs()) + tracker.create("session_commit", resource_id="s2", **_owner_kwargs()) tasks = tracker.list_tasks(resource_id="s1") assert len(tasks) == 1 assert tasks[0].resource_id == "s1" +def test_get_hides_task_from_other_owner(tracker: TaskTracker): + task = tracker.create( + "session_commit", + resource_id="s1", + owner_account_id="acme", + owner_user_id="alice", + ) + + assert ( + tracker.get( + task.task_id, + owner_account_id="acme", + owner_user_id="bob", + ) + is None + ) + + +def test_list_tasks_filters_by_owner(tracker: TaskTracker): + tracker.create( + "session_commit", + resource_id="alice-task", + owner_account_id="acme", + owner_user_id="alice", + ) + tracker.create( + "session_commit", + resource_id="bob-task", + owner_account_id="acme", + owner_user_id="bob", + ) + + tasks = tracker.list_tasks(owner_account_id="acme", owner_user_id="alice") + + assert len(tasks) == 1 + assert tasks[0].resource_id == "alice-task" + + def test_list_limit(tracker: TaskTracker): for i in range(10): - tracker.create("session_commit", resource_id=f"s{i}") + tracker.create("session_commit", resource_id=f"s{i}", **_owner_kwargs()) tasks = tracker.list_tasks(limit=3) assert len(tasks) == 3 def test_list_order_most_recent_first(tracker: TaskTracker): - tracker.create("session_commit", resource_id="first") - tracker.create("session_commit", resource_id="second") + tracker.create("session_commit", resource_id="first", **_owner_kwargs()) + tracker.create("session_commit", resource_id="second", **_owner_kwargs()) tasks = tracker.list_tasks() assert tasks[0].resource_id == "second" assert tasks[1].resource_id == "first" @@ -129,41 +184,66 @@ def test_list_order_most_recent_first(tracker: TaskTracker): def test_has_running_detects_pending(tracker: TaskTracker): - tracker.create("session_commit", resource_id="s1") + tracker.create("session_commit", resource_id="s1", **_owner_kwargs()) assert tracker.has_running("session_commit", "s1") is True def test_has_running_detects_running(tracker: TaskTracker): - t = tracker.create("session_commit", resource_id="s1") + t = tracker.create("session_commit", resource_id="s1", **_owner_kwargs()) tracker.start(t.task_id) assert tracker.has_running("session_commit", "s1") is True def test_has_running_false_after_complete(tracker: TaskTracker): - t = tracker.create("session_commit", resource_id="s1") + t = tracker.create("session_commit", resource_id="s1", **_owner_kwargs()) tracker.start(t.task_id) tracker.complete(t.task_id, {}) assert tracker.has_running("session_commit", "s1") is False def test_has_running_false_after_fail(tracker: TaskTracker): - t = tracker.create("session_commit", resource_id="s1") + t = tracker.create("session_commit", resource_id="s1", **_owner_kwargs()) tracker.start(t.task_id) tracker.fail(t.task_id, "error") assert tracker.has_running("session_commit", "s1") is False +def test_create_if_no_running_isolated_by_owner(tracker: TaskTracker): + alice_task = tracker.create_if_no_running( + "reindex", + "viking://resources/demo", + owner_account_id="acme", + owner_user_id="alice", + ) + bob_task = tracker.create_if_no_running( + "reindex", + "viking://resources/demo", + owner_account_id="acme", + owner_user_id="bob", + ) + + assert alice_task is not None + assert bob_task is not None + assert alice_task.task_id != bob_task.task_id + + # ── Serialization ── def test_to_dict(tracker: TaskTracker): - task = tracker.create("session_commit", resource_id="s1") + task = tracker.create( + "session_commit", + resource_id="s1", + **_owner_kwargs(), + ) d = task.to_dict() assert d["task_id"] == task.task_id assert d["status"] == "pending" assert d["task_type"] == "session_commit" assert d["resource_id"] == "s1" assert isinstance(d["created_at"], float) + assert "owner_account_id" not in d + assert "owner_user_id" not in d # ── Sanitization ── @@ -197,7 +277,7 @@ def test_sanitize_preserves_safe_error(): def test_evict_expired_completed(tracker: TaskTracker): - t = tracker.create("session_commit") + t = tracker.create("session_commit", **_owner_kwargs()) tracker.start(t.task_id) tracker.complete(t.task_id, {}) # Simulate old timestamp (access internal state; get() returns defensive copies) @@ -207,7 +287,7 @@ def test_evict_expired_completed(tracker: TaskTracker): def test_evict_keeps_recent_completed(tracker: TaskTracker): - t = tracker.create("session_commit") + t = tracker.create("session_commit", **_owner_kwargs()) tracker.start(t.task_id) tracker.complete(t.task_id, {}) tracker._evict_expired() @@ -218,7 +298,7 @@ def test_evict_fifo_when_over_limit(tracker: TaskTracker): tracker.MAX_TASKS = 5 tasks = [] for i in range(7): - tasks.append(tracker.create("session_commit", resource_id=f"s{i}")) + tasks.append(tracker.create("session_commit", resource_id=f"s{i}", **_owner_kwargs())) tracker._evict_expired() assert tracker.count() == 5 # Oldest should be gone @@ -242,3 +322,38 @@ def test_singleton_reset(): reset_task_tracker() t2 = get_task_tracker() assert t1 is not t2 + + +def test_create_requires_owner(tracker: TaskTracker): + with pytest.raises(TypeError): + tracker.create("session_commit", resource_id="sess-123") + + +def test_create_if_no_running_requires_owner(tracker: TaskTracker): + with pytest.raises(TypeError): + tracker.create_if_no_running("reindex", "viking://resources/demo") + + +def test_create_rejects_blank_owner_values(tracker: TaskTracker): + with pytest.raises(ValueError, match="Task ownership requires"): + tracker.create( + "session_commit", + resource_id="sess-123", + owner_account_id="", + owner_user_id="alice", + ) + + +@pytest.mark.asyncio +async def test_session_service_get_commit_task_is_owner_scoped(): + tracker = get_task_tracker() + task = tracker.create("session_commit", resource_id="sess-123", **_owner_kwargs()) + service = SessionService() + + owner_result = await service.get_commit_task(task.task_id, _make_ctx()) + other_result = await service.get_commit_task(task.task_id, _make_ctx(user_id="bob")) + + assert owner_result is not None + assert owner_result["task_id"] == task.task_id + assert owner_result["resource_id"] == "sess-123" + assert other_result is None