-
Notifications
You must be signed in to change notification settings - Fork 1
fix(adapters): accept dict StreamChunks in slack/github/google_chat streams #97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -3701,7 +3701,7 @@ async def flush_markdown_delta(delta: str) -> None: | |||||
| else: | ||||||
| await streamer.append(markdown_text=delta) | ||||||
|
|
||||||
| async def send_structured_chunk(chunk: StreamChunk) -> None: | ||||||
| async def send_structured_chunk(chunk: StreamChunk | dict[str, Any]) -> None: | ||||||
| nonlocal first, last_appended, structured_chunks_supported | ||||||
| if not structured_chunks_supported: | ||||||
| return | ||||||
|
|
@@ -3710,18 +3710,25 @@ async def send_structured_chunk(chunk: StreamChunk) -> None: | |||||
| await flush_markdown_delta(delta) | ||||||
| last_appended = committable | ||||||
|
|
||||||
| def _read(name: str) -> Any: | ||||||
| if isinstance(chunk, dict): | ||||||
| return chunk.get(name) | ||||||
| return getattr(chunk, name, None) | ||||||
|
|
||||||
| chunk_type = _read("type") | ||||||
| if not chunk_type: | ||||||
| self._logger.warn( | ||||||
| "Slack stream: ignoring chunk with no `type` field", | ||||||
| {"chunkRepr": repr(chunk)[:200]}, | ||||||
| ) | ||||||
| return | ||||||
|
|
||||||
| try: | ||||||
| chunk_data = {"type": chunk.type} # type: ignore[union-attr] | ||||||
| if hasattr(chunk, "id"): | ||||||
| chunk_data["id"] = chunk.id # type: ignore[union-attr] | ||||||
| if hasattr(chunk, "title"): | ||||||
| chunk_data["title"] = chunk.title # type: ignore[union-attr] | ||||||
| if hasattr(chunk, "status"): | ||||||
| chunk_data["status"] = chunk.status # type: ignore[union-attr] | ||||||
| if hasattr(chunk, "output") and chunk.output is not None: # type: ignore[union-attr] | ||||||
| chunk_data["output"] = chunk.output # type: ignore[union-attr] | ||||||
| if hasattr(chunk, "text"): | ||||||
| chunk_data["text"] = chunk.text # type: ignore[union-attr] | ||||||
| chunk_data: dict[str, Any] = {"type": chunk_type} | ||||||
| for field_name in ("id", "title", "status", "output", "text"): | ||||||
| value = _read(field_name) | ||||||
| if value is not None: | ||||||
| chunk_data[field_name] = value | ||||||
|
|
||||||
| if first: | ||||||
| await streamer.append(chunks=[chunk_data], token=token) | ||||||
|
|
@@ -3731,9 +3738,11 @@ async def send_structured_chunk(chunk: StreamChunk) -> None: | |||||
| except Exception as exc: | ||||||
| structured_chunks_supported = False | ||||||
| self._logger.warn( | ||||||
| "Structured streaming chunk failed, falling back to text-only streaming. " | ||||||
| "Ensure your Slack app manifest includes assistant_view, assistant:write scope.", | ||||||
| {"chunkType": getattr(chunk, "type", "unknown"), "error": exc}, | ||||||
| "Slack stream: structured-chunk append failed, falling back to " | ||||||
| "text-only for the rest of this stream. Likely causes: missing " | ||||||
| "`assistant_view` / `assistant:write` scope on the app manifest, " | ||||||
| "malformed chunk payload, or a transient Slack API error.", | ||||||
| {"chunkType": chunk_type, "error": exc}, | ||||||
| ) | ||||||
|
|
||||||
| async def push_text_and_flush(text: str) -> None: | ||||||
|
|
@@ -3747,6 +3756,14 @@ async def push_text_and_flush(text: str) -> None: | |||||
| async for chunk in text_stream: | ||||||
| if isinstance(chunk, str): | ||||||
| await push_text_and_flush(chunk) | ||||||
| elif isinstance(chunk, dict) and chunk.get("type") == "markdown_text": | ||||||
| # Dict-shaped StreamChunks are part of the contract: the | ||||||
| # `_from_full_stream` normalizer in thread.py forwards dict | ||||||
| # `{type: "markdown_text", ...}` items unchanged, so adapters | ||||||
| # must handle them the same as the dataclass form. | ||||||
| text_value = chunk.get("text") | ||||||
| if isinstance(text_value, str) and text_value: | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The truthiness check
Suggested change
References
|
||||||
| await push_text_and_flush(text_value) | ||||||
| elif hasattr(chunk, "type") and chunk.type == "markdown_text": # type: ignore[union-attr] | ||||||
| await push_text_and_flush(chunk.text) # type: ignore[union-attr] | ||||||
| else: | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -964,6 +964,92 @@ async def chunk_gen() -> AsyncIterator[StreamChunk | str]: | |
|
|
||
| assert result.id == "777.777" | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_stream_accepts_dict_markdown_text_chunks(self): | ||
| """Dict-shaped markdown_text chunks must flow into the renderer the | ||
| same as the dataclass form — `_from_full_stream` in thread.py | ||
| forwards them unchanged, so adapters must accept both shapes. | ||
| """ | ||
| adapter, client, _ = await _init_adapter() | ||
|
|
||
| mock_streamer = MagicMock() | ||
| mock_streamer.append = AsyncMock() | ||
| mock_streamer.stop = AsyncMock(return_value={"message": {"ts": "666.666"}}) | ||
| client.chat_stream = AsyncMock(return_value=mock_streamer) | ||
|
|
||
| async def chunk_gen() -> AsyncIterator[Any]: | ||
| yield {"type": "markdown_text", "text": "Hello "} | ||
| yield {"type": "markdown_text", "text": "world"} | ||
|
|
||
| result = await adapter.stream( | ||
| "slack:C123:1234567890.000000", | ||
| chunk_gen(), | ||
| StreamOptions(recipient_user_id="U1", recipient_team_id="T1"), | ||
| ) | ||
|
|
||
| assert result.id == "666.666" | ||
| appended_text = "".join( | ||
| call.kwargs.get("markdown_text", "") | ||
| for call in mock_streamer.append.call_args_list | ||
| ) | ||
| assert "Hello world" in appended_text, ( | ||
| "dict markdown_text chunks were not routed through the " | ||
| f"markdown renderer; streamer.append received {appended_text!r}" | ||
| ) | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_stream_accepts_dict_task_update_chunks(self): | ||
| """Dict-shaped task_update chunks must be forwarded as structured | ||
| chunks to `streamer.append(chunks=...)`, matching the dataclass form. | ||
| """ | ||
| adapter, client, _ = await _init_adapter() | ||
|
|
||
| mock_streamer = MagicMock() | ||
| mock_streamer.append = AsyncMock() | ||
| mock_streamer.stop = AsyncMock(return_value={"message": {"ts": "555.555"}}) | ||
| client.chat_stream = AsyncMock(return_value=mock_streamer) | ||
|
|
||
| async def chunk_gen() -> AsyncIterator[Any]: | ||
| yield "Starting " | ||
| yield { | ||
| "type": "task_update", | ||
| "id": "task1", | ||
| "title": "Search", | ||
| "status": "in_progress", | ||
| } | ||
| yield { | ||
| "type": "task_update", | ||
| "id": "task1", | ||
| "title": "Search", | ||
| "status": "complete", | ||
| "output": "Found 5", | ||
| } | ||
|
|
||
| result = await adapter.stream( | ||
| "slack:C123:1234567890.000000", | ||
| chunk_gen(), | ||
| StreamOptions(recipient_user_id="U1", recipient_team_id="T1"), | ||
| ) | ||
|
|
||
| assert result.id == "555.555" | ||
| structured_calls = [ | ||
| call for call in mock_streamer.append.call_args_list | ||
| if "chunks" in call.kwargs | ||
| ] | ||
| assert structured_calls, ( | ||
| "dict-shaped task_update chunks were not forwarded as " | ||
| "structured chunks to streamer.append(chunks=...)" | ||
| ) | ||
| all_forwarded_chunks = [ | ||
| chunk | ||
| for call in structured_calls | ||
| for chunk in call.kwargs["chunks"] | ||
| ] | ||
| assert any(c.get("type") == "task_update" for c in all_forwarded_chunks), ( | ||
| "no forwarded chunk had type=task_update; got: " | ||
| f"{all_forwarded_chunks!r}" | ||
| ) | ||
|
Comment on lines
+967
to
+1051
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix formatting to pass CI. The pipeline indicates 🤖 Prompt for AI Agents |
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_stream_awaits_chat_stream_coroutine(self): | ||
| """Regression test for issue #44: ``AsyncWebClient.chat_stream`` is a | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a truthiness check
if not chunk_type:on an optional value violates the general rule to useis not None. Ifchunk_typeis an empty string, it will be silently ignored and the function will return early without processing the chunk.References
is not Noneinstead of a truthiness check to avoid silently ignoring them.