Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/kaggle/api/kaggle_api_extended.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
ApiSaveKernelResponse,
ApiKernelMetadata,
ApiDeleteKernelRequest,
ApiCancelKernelSessionRequest,
)
from kagglesdk.kernels.types.kernels_enums import KernelsListSortType, KernelsListViewType
from kagglesdk.models.types.model_api_service import (
Expand Down Expand Up @@ -2537,6 +2538,69 @@ def kernels_delete_cli(self, kernel: str, no_confirm: bool = False) -> None:
"""
self.kernels_delete(kernel, no_confirm)

def kernels_cancel(self, kernel):
"""Cancels the latest running session for a kernel.

Args:
kernel: The kernel identifier in the format [owner]/[kernel-name].

Returns:
The cancel response from the API.
"""
if kernel is None:
raise ValueError("A kernel must be specified")
if "/" in kernel:
self.validate_kernel_string(kernel)
kernel_url_list = kernel.split("/")
owner_slug = kernel_url_list[0]
kernel_slug = kernel_url_list[1]
else:
owner_slug = self.get_config_value(self.CONFIG_NAME_USER)
kernel_slug = kernel

with self.build_kaggle_client() as kaggle:
# First, get the session status to obtain the kernel_session_id.
# The SDK response type doesn't expose the session ID, so we make
# a raw HTTP call and parse the full JSON response.
http_client = kaggle.http_client()
status_request = ApiGetKernelSessionStatusRequest()
status_request.user_name = owner_slug
status_request.kernel_slug = kernel_slug
http_request = http_client._prepare_request(
"kernels.KernelsApiService", "GetKernelSessionStatus", status_request
)
settings = http_client._session.merge_environment_settings(
http_request.url, {}, None, None, None
)
http_response = http_client._session.send(http_request, **settings)
http_response.raise_for_status()
response_data = http_response.json()

kernel_session_id = response_data.get("kernelSessionId")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rosbo @jplotts Can either of you confirm that we actually have "kernelSessionId" in the Http Response object? It doesn't show up in VERBOSE output from kaggle k status ..., but that might not print everything.

if not kernel_session_id:
raise ValueError(
f"No active session found for kernel '{kernel}'. "
"The kernel may not have been run or may have already completed."
)

cancel_request = ApiCancelKernelSessionRequest()
cancel_request.kernel_session_id = kernel_session_id
return kaggle.kernels.kernels_api_client.cancel_kernel_session(cancel_request)

def kernels_cancel_cli(self, kernel, kernel_opt=None):
"""A client wrapper for cancelling a kernel session.

Args:
kernel: The kernel for which to cancel the session.
kernel_opt: An additional option from the client, if the kernel is not defined.
"""
kernel = kernel or kernel_opt
response = self.kernels_cancel(kernel)
if response.error_message:
print(f"Cancel failed: {response.error_message}")
else:
print(f"Kernel session for '{kernel}' was cancelled successfully.")

def dataset_delete_cli(self, dataset: str, no_confirm: bool = False) -> None:
"""A client wrapper for deleting a dataset.

Expand Down
15 changes: 14 additions & 1 deletion src/kaggle/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,18 @@ def parse_kernels(subparsers) -> None:
parser_kernels_status._action_groups.append(parser_kernels_status_optional)
parser_kernels_status.set_defaults(func=api.kernels_status_cli)

# Kernels cancel
parser_kernels_cancel = subparsers_kernels.add_parser(
"cancel", formatter_class=argparse.RawTextHelpFormatter, help=Help.command_kernels_cancel
)
parser_kernels_cancel_optional = parser_kernels_cancel._action_groups.pop()
parser_kernels_cancel_optional.add_argument("kernel", nargs="?", default=None, help=Help.param_kernel)
parser_kernels_cancel_optional.add_argument(
"-k", "--kernel", dest="kernel_opt", required=False, help=argparse.SUPPRESS
)
parser_kernels_cancel._action_groups.append(parser_kernels_cancel_optional)
parser_kernels_cancel.set_defaults(func=api.kernels_cancel_cli)

# Kernels delete
parser_kernels_delete = subparsers_kernels.add_parser(
"delete", formatter_class=argparse.RawTextHelpFormatter, help=Help.command_kernels_delete
Expand Down Expand Up @@ -1073,7 +1085,7 @@ class Help(object):
]
competitions_choices = ["list", "files", "download", "submit", "submissions", "leaderboard"]
datasets_choices = ["list", "files", "download", "create", "version", "init", "metadata", "status", "delete"]
kernels_choices = ["list", "files", "get", "init", "push", "pull", "output", "status", "update", "delete"]
kernels_choices = ["list", "files", "get", "init", "push", "pull", "output", "status", "update", "delete", "cancel"]
models_choices = ["instances", "i", "variations", "v", "get", "list", "init", "create", "delete", "update"]
model_instances_choices = ["versions", "v", "get", "files", "list", "init", "create", "delete", "update"]
model_instance_versions_choices = ["init", "create", "download", "delete", "files", "list"]
Expand Down Expand Up @@ -1138,6 +1150,7 @@ class Help(object):
command_kernels_pull = "Pull down code from a kernel"
command_kernels_output = "Get data output from the latest kernel run"
command_kernels_status = "Display the status of the latest kernel run"
command_kernels_cancel = "Cancel the latest running kernel session"
command_kernels_delete = "Delete a kernel"

# Models commands
Expand Down
176 changes: 176 additions & 0 deletions tests/test_kernels_cancel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# coding=utf-8
import json
import unittest
from unittest.mock import MagicMock, patch, PropertyMock

import sys
import os

sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

from kaggle.api.kaggle_api_extended import KaggleApi
from kagglesdk.kernels.types.kernels_api_service import (
ApiCancelKernelSessionRequest,
ApiCancelKernelSessionResponse,
)


class TestKernelsCancel(unittest.TestCase):
def setUp(self):
self.api = KaggleApi.__new__(KaggleApi)
self.api.args = []
self.api.config_values = {"username": "testuser", "key": "testkey"}
self.api.already_printed_version_warning = True

@patch.object(KaggleApi, "build_kaggle_client")
def test_cancel_successful(self, mock_build_client):
"""Test that a successful cancel prints a success message."""
mock_kaggle = MagicMock()
mock_build_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle)
mock_build_client.return_value.__exit__ = MagicMock(return_value=False)

# Mock the raw HTTP call to get session status with session ID
mock_http_client = MagicMock()
mock_kaggle.http_client.return_value = mock_http_client

mock_http_response = MagicMock()
mock_http_response.json.return_value = {"status": "running", "kernelSessionId": 12345}
mock_http_response.raise_for_status = MagicMock()
mock_http_client._session.send.return_value = mock_http_response
mock_http_client._session.merge_environment_settings.return_value = {}
mock_http_client._prepare_request.return_value = MagicMock(url="http://test")

# Mock the cancel response
cancel_response = ApiCancelKernelSessionResponse()
mock_kaggle.kernels.kernels_api_client.cancel_kernel_session.return_value = cancel_response

result = self.api.kernels_cancel("owner/kernel-slug")

# Verify cancel was called with the correct session ID
call_args = mock_kaggle.kernels.kernels_api_client.cancel_kernel_session.call_args
cancel_request = call_args[0][0]
self.assertEqual(cancel_request.kernel_session_id, 12345)
self.assertEqual(result.error_message, "")

@patch.object(KaggleApi, "build_kaggle_client")
def test_cancel_with_error_response(self, mock_build_client):
"""Test that cancel handles an error response from the API."""
mock_kaggle = MagicMock()
mock_build_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle)
mock_build_client.return_value.__exit__ = MagicMock(return_value=False)

mock_http_client = MagicMock()
mock_kaggle.http_client.return_value = mock_http_client

mock_http_response = MagicMock()
mock_http_response.json.return_value = {"status": "complete", "kernelSessionId": 99999}
mock_http_response.raise_for_status = MagicMock()
mock_http_client._session.send.return_value = mock_http_response
mock_http_client._session.merge_environment_settings.return_value = {}
mock_http_client._prepare_request.return_value = MagicMock(url="http://test")

cancel_response = ApiCancelKernelSessionResponse()
cancel_response.error_message = "Session is not running"
mock_kaggle.kernels.kernels_api_client.cancel_kernel_session.return_value = cancel_response

result = self.api.kernels_cancel("owner/kernel-slug")
self.assertEqual(result.error_message, "Session is not running")

def test_cancel_none_kernel_raises(self):
"""Test that passing None raises ValueError."""
with self.assertRaises(ValueError) as ctx:
self.api.kernels_cancel(None)
self.assertIn("A kernel must be specified", str(ctx.exception))

def test_cancel_invalid_format_raises(self):
"""Test that a kernel slug that is too short raises ValueError."""
with self.assertRaises(ValueError):
self.api.kernels_cancel("owner/ab")

@patch.object(KaggleApi, "build_kaggle_client")
def test_cancel_no_session_id_raises(self, mock_build_client):
"""Test that missing session ID in status response raises ValueError."""
mock_kaggle = MagicMock()
mock_build_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle)
mock_build_client.return_value.__exit__ = MagicMock(return_value=False)

mock_http_client = MagicMock()
mock_kaggle.http_client.return_value = mock_http_client

mock_http_response = MagicMock()
mock_http_response.json.return_value = {"status": "complete"}
mock_http_response.raise_for_status = MagicMock()
mock_http_client._session.send.return_value = mock_http_response
mock_http_client._session.merge_environment_settings.return_value = {}
mock_http_client._prepare_request.return_value = MagicMock(url="http://test")

with self.assertRaises(ValueError) as ctx:
self.api.kernels_cancel("owner/kernel-slug")
self.assertIn("No active session found", str(ctx.exception))

@patch.object(KaggleApi, "build_kaggle_client")
def test_cancel_without_owner_uses_config_user(self, mock_build_client):
"""Test that a kernel slug without owner uses the configured username."""
mock_kaggle = MagicMock()
mock_build_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle)
mock_build_client.return_value.__exit__ = MagicMock(return_value=False)

mock_http_client = MagicMock()
mock_kaggle.http_client.return_value = mock_http_client

mock_http_response = MagicMock()
mock_http_response.json.return_value = {"status": "running", "kernelSessionId": 100}
mock_http_response.raise_for_status = MagicMock()
mock_http_client._session.send.return_value = mock_http_response
mock_http_client._session.merge_environment_settings.return_value = {}
mock_http_client._prepare_request.return_value = MagicMock(url="http://test")

cancel_response = ApiCancelKernelSessionResponse()
mock_kaggle.kernels.kernels_api_client.cancel_kernel_session.return_value = cancel_response

self.api.kernels_cancel("my-kernel")

# Verify the status request used the configured username
prepare_call = mock_http_client._prepare_request.call_args
status_request = prepare_call[0][2]
self.assertEqual(status_request.user_name, "testuser")
self.assertEqual(status_request.kernel_slug, "my-kernel")

@patch("builtins.print")
@patch.object(KaggleApi, "kernels_cancel")
def test_cancel_cli_success(self, mock_cancel, mock_print):
"""Test CLI wrapper prints success message."""
response = ApiCancelKernelSessionResponse()
mock_cancel.return_value = response

self.api.kernels_cancel_cli("owner/kernel-slug")

mock_cancel.assert_called_once_with("owner/kernel-slug")
mock_print.assert_called_once_with("Kernel session for 'owner/kernel-slug' was cancelled successfully.")

@patch("builtins.print")
@patch.object(KaggleApi, "kernels_cancel")
def test_cancel_cli_error(self, mock_cancel, mock_print):
"""Test CLI wrapper prints error message."""
response = ApiCancelKernelSessionResponse()
response.error_message = "Cannot cancel completed session"
mock_cancel.return_value = response

self.api.kernels_cancel_cli("owner/kernel-slug")

mock_print.assert_called_once_with("Cancel failed: Cannot cancel completed session")

@patch("builtins.print")
@patch.object(KaggleApi, "kernels_cancel")
def test_cancel_cli_uses_kernel_opt(self, mock_cancel, mock_print):
"""Test CLI wrapper falls back to kernel_opt argument."""
response = ApiCancelKernelSessionResponse()
mock_cancel.return_value = response

self.api.kernels_cancel_cli(None, kernel_opt="owner/my-kernel")

mock_cancel.assert_called_once_with("owner/my-kernel")


if __name__ == "__main__":
unittest.main()
Loading