From 4b76134bb837804c339adbd85c424a29910447c5 Mon Sep 17 00:00:00 2001 From: tony-chinchill Date: Wed, 20 May 2026 21:34:49 -0700 Subject: [PATCH] fix(adapters): accept dict StreamChunks in slack/github/google_chat streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `_from_full_stream` (thread.py:1218-1226) explicitly forwards `{"type": "markdown_text" | "task_update" | "plan_update", ...}` dict chunks unchanged, so downstream adapters MUST accept both the dataclass and dict shapes of `StreamChunk`. The Teams adapter already does (adapter.py:927), but slack/github/google_chat only check `hasattr(chunk, "type")` — which is False for dicts in Python — so dict chunks were silently dropped or, worse, raised AttributeError inside Slack's `send_structured_chunk` and downgraded the entire stream to text-only with a misleading warning about missing assistant_view scopes. Changes: - slack/adapter.py: route dict markdown_text through the renderer; add a `_read(name)` helper in `send_structured_chunk` so dict and dataclass field access share one code path; tighten the fallback warning to enumerate the actual possible causes (missing scope OR malformed chunk OR transient Slack API error) instead of pointing solely at the manifest. - github/adapter.py: accept dict markdown_text chunks in the accumulator. - google_chat/adapter.py: same. - test_slack_api.py: two regression tests in TestStream covering the two distinct adapter branches affected (renderer dispatch + structured chunk forwarding). Both fail when the dict branches are reverted. This brings the Python adapters in line with the SDK's own internal contract — adapters do not enforce a dataclass-only shape for chunks that `_from_full_stream` is documented and tested to pass through as dicts. --- src/chat_sdk/adapters/github/adapter.py | 4 + src/chat_sdk/adapters/google_chat/adapter.py | 4 + src/chat_sdk/adapters/slack/adapter.py | 47 +++++++---- tests/test_slack_api.py | 86 ++++++++++++++++++++ 4 files changed, 126 insertions(+), 15 deletions(-) diff --git a/src/chat_sdk/adapters/github/adapter.py b/src/chat_sdk/adapters/github/adapter.py index 1f11849..34a2d5c 100644 --- a/src/chat_sdk/adapters/github/adapter.py +++ b/src/chat_sdk/adapters/github/adapter.py @@ -529,6 +529,10 @@ async def stream( async for chunk in text_stream: if isinstance(chunk, str): text += chunk + elif isinstance(chunk, dict) and chunk.get("type") == "markdown_text": + text_value = chunk.get("text") + if isinstance(text_value, str): + text += text_value elif hasattr(chunk, "type") and chunk.type == "markdown_text": text += chunk.text return await self.post_message(thread_id, PostableMarkdown(markdown=text)) diff --git a/src/chat_sdk/adapters/google_chat/adapter.py b/src/chat_sdk/adapters/google_chat/adapter.py index 10503b3..468b45d 100644 --- a/src/chat_sdk/adapters/google_chat/adapter.py +++ b/src/chat_sdk/adapters/google_chat/adapter.py @@ -1614,6 +1614,10 @@ async def stream( async for chunk in text_stream: if isinstance(chunk, str): accumulated += chunk + elif isinstance(chunk, dict) and chunk.get("type") == "markdown_text": + text_value = chunk.get("text") + if isinstance(text_value, str): + accumulated += text_value elif hasattr(chunk, "type") and chunk.type == "markdown_text": accumulated += chunk.text return await self.post_message(thread_id, PostableMarkdown(markdown=accumulated)) diff --git a/src/chat_sdk/adapters/slack/adapter.py b/src/chat_sdk/adapters/slack/adapter.py index 918240a..62f356d 100644 --- a/src/chat_sdk/adapters/slack/adapter.py +++ b/src/chat_sdk/adapters/slack/adapter.py @@ -2384,7 +2384,7 @@ async def flush_markdown_delta(delta: str) -> None: else: await streamer.append(markdown_text=delta) - async def send_structured_chunk(chunk: StreamChunk) -> None: + async def send_structured_chunk(chunk: StreamChunk | dict[str, Any]) -> None: nonlocal first, last_appended, structured_chunks_supported if not structured_chunks_supported: return @@ -2393,18 +2393,25 @@ async def send_structured_chunk(chunk: StreamChunk) -> None: await flush_markdown_delta(delta) last_appended = committable + def _read(name: str) -> Any: + if isinstance(chunk, dict): + return chunk.get(name) + return getattr(chunk, name, None) + + chunk_type = _read("type") + if not chunk_type: + self._logger.warn( + "Slack stream: ignoring chunk with no `type` field", + {"chunkRepr": repr(chunk)[:200]}, + ) + return + try: - chunk_data = {"type": chunk.type} # type: ignore[union-attr] - if hasattr(chunk, "id"): - chunk_data["id"] = chunk.id # type: ignore[union-attr] - if hasattr(chunk, "title"): - chunk_data["title"] = chunk.title # type: ignore[union-attr] - if hasattr(chunk, "status"): - chunk_data["status"] = chunk.status # type: ignore[union-attr] - if hasattr(chunk, "output") and chunk.output is not None: # type: ignore[union-attr] - chunk_data["output"] = chunk.output # type: ignore[union-attr] - if hasattr(chunk, "text"): - chunk_data["text"] = chunk.text # type: ignore[union-attr] + chunk_data: dict[str, Any] = {"type": chunk_type} + for field_name in ("id", "title", "status", "output", "text"): + value = _read(field_name) + if value is not None: + chunk_data[field_name] = value if first: await streamer.append(chunks=[chunk_data], token=token) @@ -2414,9 +2421,11 @@ async def send_structured_chunk(chunk: StreamChunk) -> None: except Exception as exc: structured_chunks_supported = False self._logger.warn( - "Structured streaming chunk failed, falling back to text-only streaming. " - "Ensure your Slack app manifest includes assistant_view, assistant:write scope.", - {"chunkType": getattr(chunk, "type", "unknown"), "error": exc}, + "Slack stream: structured-chunk append failed, falling back to " + "text-only for the rest of this stream. Likely causes: missing " + "`assistant_view` / `assistant:write` scope on the app manifest, " + "malformed chunk payload, or a transient Slack API error.", + {"chunkType": chunk_type, "error": exc}, ) async def push_text_and_flush(text: str) -> None: @@ -2430,6 +2439,14 @@ async def push_text_and_flush(text: str) -> None: async for chunk in text_stream: if isinstance(chunk, str): await push_text_and_flush(chunk) + elif isinstance(chunk, dict) and chunk.get("type") == "markdown_text": + # Dict-shaped StreamChunks are part of the contract: the + # `_from_full_stream` normalizer in thread.py forwards dict + # `{type: "markdown_text", ...}` items unchanged, so adapters + # must handle them the same as the dataclass form. + text_value = chunk.get("text") + if isinstance(text_value, str) and text_value: + await push_text_and_flush(text_value) elif hasattr(chunk, "type") and chunk.type == "markdown_text": # type: ignore[union-attr] await push_text_and_flush(chunk.text) # type: ignore[union-attr] else: diff --git a/tests/test_slack_api.py b/tests/test_slack_api.py index c5fc7ed..940ae88 100644 --- a/tests/test_slack_api.py +++ b/tests/test_slack_api.py @@ -964,6 +964,92 @@ async def chunk_gen() -> AsyncIterator[StreamChunk | str]: assert result.id == "777.777" + @pytest.mark.asyncio + async def test_stream_accepts_dict_markdown_text_chunks(self): + """Dict-shaped markdown_text chunks must flow into the renderer the + same as the dataclass form — `_from_full_stream` in thread.py + forwards them unchanged, so adapters must accept both shapes. + """ + adapter, client, _ = await _init_adapter() + + mock_streamer = MagicMock() + mock_streamer.append = AsyncMock() + mock_streamer.stop = AsyncMock(return_value={"message": {"ts": "666.666"}}) + client.chat_stream = AsyncMock(return_value=mock_streamer) + + async def chunk_gen() -> AsyncIterator[Any]: + yield {"type": "markdown_text", "text": "Hello "} + yield {"type": "markdown_text", "text": "world"} + + result = await adapter.stream( + "slack:C123:1234567890.000000", + chunk_gen(), + StreamOptions(recipient_user_id="U1", recipient_team_id="T1"), + ) + + assert result.id == "666.666" + appended_text = "".join( + call.kwargs.get("markdown_text", "") + for call in mock_streamer.append.call_args_list + ) + assert "Hello world" in appended_text, ( + "dict markdown_text chunks were not routed through the " + f"markdown renderer; streamer.append received {appended_text!r}" + ) + + @pytest.mark.asyncio + async def test_stream_accepts_dict_task_update_chunks(self): + """Dict-shaped task_update chunks must be forwarded as structured + chunks to `streamer.append(chunks=...)`, matching the dataclass form. + """ + adapter, client, _ = await _init_adapter() + + mock_streamer = MagicMock() + mock_streamer.append = AsyncMock() + mock_streamer.stop = AsyncMock(return_value={"message": {"ts": "555.555"}}) + client.chat_stream = AsyncMock(return_value=mock_streamer) + + async def chunk_gen() -> AsyncIterator[Any]: + yield "Starting " + yield { + "type": "task_update", + "id": "task1", + "title": "Search", + "status": "in_progress", + } + yield { + "type": "task_update", + "id": "task1", + "title": "Search", + "status": "complete", + "output": "Found 5", + } + + result = await adapter.stream( + "slack:C123:1234567890.000000", + chunk_gen(), + StreamOptions(recipient_user_id="U1", recipient_team_id="T1"), + ) + + assert result.id == "555.555" + structured_calls = [ + call for call in mock_streamer.append.call_args_list + if "chunks" in call.kwargs + ] + assert structured_calls, ( + "dict-shaped task_update chunks were not forwarded as " + "structured chunks to streamer.append(chunks=...)" + ) + all_forwarded_chunks = [ + chunk + for call in structured_calls + for chunk in call.kwargs["chunks"] + ] + assert any(c.get("type") == "task_update" for c in all_forwarded_chunks), ( + "no forwarded chunk had type=task_update; got: " + f"{all_forwarded_chunks!r}" + ) + @pytest.mark.asyncio async def test_stream_awaits_chat_stream_coroutine(self): """Regression test for issue #44: ``AsyncWebClient.chat_stream`` is a