Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from io import BytesIO
from typing import Annotated, Any

import httpx
from httpx import AsyncClient, HTTPStatusError
from pydantic import ValidationError

from semantic_kernel.connectors.telemetry import HTTP_USER_AGENT, version_info
Expand Down Expand Up @@ -35,14 +35,14 @@ class SessionsPythonTool(KernelBaseModel):
pool_management_endpoint: HttpsUrl
settings: SessionsPythonSettings
auth_callback: Callable[..., Awaitable[Any]]
http_client: httpx.AsyncClient
http_client: AsyncClient

def __init__(
self,
auth_callback: Callable[..., Awaitable[Any]],
pool_management_endpoint: str | None = None,
settings: SessionsPythonSettings | None = None,
http_client: httpx.AsyncClient | None = None,
http_client: AsyncClient | None = None,
env_file_path: str | None = None,
**kwargs,
):
Expand All @@ -59,7 +59,7 @@ def __init__(
settings = SessionsPythonSettings()

if not http_client:
http_client = httpx.AsyncClient()
http_client = AsyncClient()

super().__init__(
pool_management_endpoint=aca_settings.pool_management_endpoint,
Expand All @@ -69,6 +69,7 @@ def __init__(
**kwargs,
)

# region Helper Methods
async def _ensure_auth_token(self) -> str:
"""Ensure the auth token is valid."""
try:
Expand Down Expand Up @@ -111,8 +112,15 @@ def _build_url_with_version(self, base_url, endpoint, params):
"""Builds a URL with the provided base URL, endpoint, and query parameters."""
params["api-version"] = SESSIONS_API_VERSION
query_string = "&".join([f"{key}={value}" for key, value in params.items()])
if not base_url.endswith("/"):
base_url += "/"
if endpoint.endswith("/"):
endpoint = endpoint[:-1]
return f"{base_url}{endpoint}?{query_string}"

# endregion

# region Kernel Functions
@kernel_function(
description="""Executes the provided Python code.
Start and end the code snippet with double quotes to define it as a string.
Expand Down Expand Up @@ -159,19 +167,24 @@ async def execute_code(self, code: Annotated[str, "The valid Python code to exec
}

url = self._build_url_with_version(
base_url=self.pool_management_endpoint,
endpoint="python/execute/",
base_url=str(self.pool_management_endpoint),
endpoint="code/execute/",
params={"identifier": self.settings.session_id},
)

response = await self.http_client.post(
url=url,
json=request_body,
)
response.raise_for_status()

result = response.json()
return f"Result:\n{result['result']}Stdout:\n{result['stdout']}Stderr:\n{result['stderr']}"
try:
response = await self.http_client.post(
url=url,
json=request_body,
)
response.raise_for_status()
result = response.json()["properties"]
return f"Result:\n{result['result']}Stdout:\n{result['stdout']}Stderr:\n{result['stderr']}"
except HTTPStatusError as e:
error_message = e.response.text if e.response.text else e.response.reason_phrase
raise FunctionExecutionException(
f"Code execution failed with status code {e.response.status_code} and error: {error_message}"
) from e

@kernel_function(name="upload_file", description="Uploads a file for the current Session ID")
async def upload_file(
Expand Down Expand Up @@ -199,32 +212,32 @@ async def upload_file(

remote_file_path = self._construct_remote_file_path(remote_file_path or os.path.basename(local_file_path))

with open(local_file_path, "rb") as data:
auth_token = await self._ensure_auth_token()
self.http_client.headers.update(
{
"Authorization": f"Bearer {auth_token}",
USER_AGENT: SESSIONS_USER_AGENT,
}
)
files = [("file", (remote_file_path, data, "application/octet-stream"))]

url = self._build_url_with_version(
base_url=self.pool_management_endpoint,
endpoint="python/uploadFile",
params={"identifier": self.settings.session_id},
)

response = await self.http_client.post(
url=url,
json={},
files=files, # type: ignore
)
auth_token = await self._ensure_auth_token()
self.http_client.headers.update(
{
"Authorization": f"Bearer {auth_token}",
USER_AGENT: SESSIONS_USER_AGENT,
}
)

response.raise_for_status()
url = self._build_url_with_version(
base_url=str(self.pool_management_endpoint),
endpoint="files/upload",
params={"identifier": self.settings.session_id},
)

response_json = response.json()
return SessionsRemoteFileMetadata.from_dict(response_json["$values"][0])
try:
with open(local_file_path, "rb") as data:
files = {"file": (remote_file_path, data, "application/octet-stream")}
response = await self.http_client.post(url=url, files=files)
response.raise_for_status()
response_json = response.json()
return SessionsRemoteFileMetadata.from_dict(response_json["value"][0]["properties"])
except HTTPStatusError as e:
error_message = e.response.text if e.response.text else e.response.reason_phrase
raise FunctionExecutionException(
f"Upload failed with status code {e.response.status_code} and error: {error_message}"
Comment thread
moonbox3 marked this conversation as resolved.
) from e

@kernel_function(name="list_files", description="Lists all files in the provided Session ID")
async def list_files(self) -> list[SessionsRemoteFileMetadata]:
Expand All @@ -242,26 +255,36 @@ async def list_files(self) -> list[SessionsRemoteFileMetadata]:
)

url = self._build_url_with_version(
base_url=self.pool_management_endpoint,
endpoint="python/files",
base_url=str(self.pool_management_endpoint),
endpoint="files",
params={"identifier": self.settings.session_id},
)

response = await self.http_client.get(
url=url,
)
response.raise_for_status()

response_json = response.json()
return [SessionsRemoteFileMetadata.from_dict(entry) for entry in response_json["$values"]]

async def download_file(self, *, remote_file_path: str, local_file_path: str | None = None) -> BytesIO | None:
try:
response = await self.http_client.get(
url=url,
)
response.raise_for_status()
response_json = response.json()
return [SessionsRemoteFileMetadata.from_dict(entry["properties"]) for entry in response_json["value"]]
except HTTPStatusError as e:
error_message = e.response.text if e.response.text else e.response.reason_phrase
raise FunctionExecutionException(
f"List files failed with status code {e.response.status_code} and error: {error_message}"
) from e

async def download_file(
self,
*,
remote_file_name: Annotated[str, "The name of the file to download, relative to /mnt/data"],
local_file_path: Annotated[str | None, "The local file path to save the file to, optional"] = None,
) -> Annotated[BytesIO | None, "The data of the downloaded file"]:
"""Download a file from the session pool.

Args:
remote_file_path: The path to download the file from, relative to `/mnt/data`.
local_file_path: The path to save the downloaded file to. If not provided, the
file is returned as a BufferedReader.
remote_file_name: The name of the file to download, relative to `/mnt/data`.
local_file_path: The path to save the downloaded file to. Should include the extension.
If not provided, the file is returned as a BufferedReader.

Returns:
BufferedReader: The data of the downloaded file.
Expand All @@ -275,19 +298,25 @@ async def download_file(self, *, remote_file_path: str, local_file_path: str | N
)

url = self._build_url_with_version(
base_url=self.pool_management_endpoint,
endpoint="python/downloadFile",
params={"identifier": self.settings.session_id, "filename": remote_file_path},
)

response = await self.http_client.get(
url=url,
base_url=str(self.pool_management_endpoint),
endpoint=f"files/content/{remote_file_name}",
params={"identifier": self.settings.session_id},
)
response.raise_for_status()

if local_file_path:
with open(local_file_path, "wb") as f:
f.write(response.content)
return None

return BytesIO(response.content)
try:
response = await self.http_client.get(
url=url,
)
response.raise_for_status()
if local_file_path:
with open(local_file_path, "wb") as f:
f.write(response.content)
return None

return BytesIO(response.content)
except HTTPStatusError as e:
error_message = e.response.text if e.response.text else e.response.reason_phrase
raise FunctionExecutionException(
f"Download failed with status code {e.response.status_code} and error: {error_message}"
) from e
# endregion
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ class CodeExecutionType(str, Enum):
class SessionsPythonSettings(KernelBaseModel):
"""The Sessions Python code interpreter settings."""

session_id: str | None = Field(default_factory=lambda: str(uuid.uuid4()), alias="identifier")
session_id: str | None = Field(default_factory=lambda: str(uuid.uuid4()), alias="identifier", exclude=True)
code_input_type: CodeInputType | None = Field(default=CodeInputType.Inline, alias="codeInputType")
execution_type: CodeExecutionType | None = Field(default=CodeExecutionType.Synchronous, alias="executionType")
python_code: str | None = Field(alias="pythonCode", default=None)
python_code: str | None = Field(alias="code", default=None)
timeout_in_sec: int | None = Field(default=100, alias="timeoutInSeconds")
sanitize_input: bool | None = Field(default=True, alias="sanitizeInput")

Expand Down
Loading