Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions experimental/aitools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Current commands:
- `databricks experimental aitools tools query`
- `databricks experimental aitools tools discover-schema`
- `databricks experimental aitools tools get-default-warehouse`
- `databricks experimental aitools tools statement submit`
- `databricks experimental aitools tools statement get`
- `databricks experimental aitools tools statement status`
- `databricks experimental aitools tools statement cancel`

Current behavior:

Expand All @@ -29,6 +33,19 @@ Current behavior:
"SELECT vendor_id, count(*) FROM samples.nyctaxi.trips GROUP BY 1"
```

- `tools statement` is a low-level lifecycle for asynchronous statements.
`submit` returns a `statement_id` immediately, `get` polls until terminal
and emits rows, `status` peeks without blocking, and `cancel` requests
termination. Ctrl+C on `get` stops polling but does NOT cancel the
server-side statement; use `cancel` for that.

```bash
SID=$(databricks experimental aitools tools statement submit \
--warehouse <wh> "SELECT pg_sleep(5)" | jq -r '.statement_id')
databricks experimental aitools tools statement status "$SID"
databricks experimental aitools tools statement get "$SID"
```

Removed behavior:

- there is no MCP server under `experimental aitools`
Expand Down
77 changes: 77 additions & 0 deletions experimental/aitools/cmd/statement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package aitools

import (
"encoding/json"
"fmt"
"io"

"github.com/databricks/databricks-sdk-go/service/sql"
"github.com/spf13/cobra"
)

// statementInfo is the JSON shape emitted by every `tools statement`
// subcommand. Fields are populated as the subcommand has them. omitempty keeps
// the output tight: `submit` doesn't emit columns/rows, `cancel` doesn't emit a
// warehouse_id, etc.
type statementInfo struct {
StatementID string `json:"statement_id"`
State sql.StatementState `json:"state,omitempty"`
WarehouseID string `json:"warehouse_id,omitempty"`
Columns []string `json:"columns,omitempty"`
Rows [][]string `json:"rows,omitempty"`
Error *batchResultError `json:"error,omitempty"`
}

func renderStatementInfo(w io.Writer, info statementInfo) error {
data, err := json.MarshalIndent(info, "", " ")
if err != nil {
return fmt.Errorf("marshal statement info: %w", err)
}
fmt.Fprintf(w, "%s\n", data)
return nil
}

// statementErrorFromStatus builds a batchResultError for any terminal non-success
// state (FAILED, CANCELED, CLOSED), populating it from the server's ServiceError
// when available and synthesizing a message when it isn't. Returns nil for
// SUCCEEDED, non-terminal states, and nil status. The synthesized fallback
// matters because the Statements API can hand back a non-success terminal state
// with `Error == nil`, and skill consumers should be able to branch on
// `error == null` alone instead of inspecting `state`.
func statementErrorFromStatus(status *sql.StatementStatus) *batchResultError {
if status == nil || !isTerminalState(status) || status.State == sql.StatementStateSucceeded {
return nil
}
out := &batchResultError{}
if status.Error != nil {
out.Message = status.Error.Message
out.ErrorCode = string(status.Error.ErrorCode)
} else {
out.Message = fmt.Sprintf("statement reached terminal state %s", status.State)
}
return out
}

func newStatementCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "statement",
Short: "Manage SQL statement lifecycle (submit, get, status, cancel)",
Long: `Low-level command tree for asynchronous SQL execution.

Use 'submit' to fire a statement and get its statement_id back, then
'get' to block on results, 'status' to peek without blocking, and
'cancel' to terminate. For "I want results now," use 'tools query'
instead.

All subcommands emit a JSON object with the statement_id and state.
'get' adds columns and rows on success; any subcommand may emit an
error object when the server reports a non-success terminal state.`,
}

cmd.AddCommand(newStatementSubmitCmd())
cmd.AddCommand(newStatementGetCmd())
cmd.AddCommand(newStatementStatusCmd())
cmd.AddCommand(newStatementCancelCmd())

return cmd
}
53 changes: 53 additions & 0 deletions experimental/aitools/cmd/statement_cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package aitools

import (
"context"
"fmt"

"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdctx"
"github.com/databricks/databricks-sdk-go/service/sql"
"github.com/spf13/cobra"
)

func newStatementCancelCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "cancel STATEMENT_ID",
Short: "Request cancellation of a running statement",
Long: `Send a cancellation request for the given statement_id. The Statements
API returns no body on cancel; this command optimistically reports
state=CANCELED on success. Use 'statement status' afterwards to confirm
the server-side state if you need certainty.`,
Example: ` databricks experimental aitools tools statement cancel 01ef...`,
Args: cobra.ExactArgs(1),
PreRunE: root.MustWorkspaceClient,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
w := cmdctx.WorkspaceClient(ctx)
statementID := args[0]

info, err := cancelStatementExecution(ctx, w.StatementExecution, statementID)
if err != nil {
return err
}
return renderStatementInfo(cmd.OutOrStdout(), info)
},
}

return cmd
}

// cancelStatementExecution issues CancelExecution and reports state=CANCELED on success.
// CancelExecution returns no body; the actual server-side state is verified
// asynchronously. Use 'statement status' to confirm if certainty is required.
func cancelStatementExecution(ctx context.Context, api sql.StatementExecutionInterface, statementID string) (statementInfo, error) {
if err := api.CancelExecution(ctx, sql.CancelExecutionRequest{
StatementId: statementID,
}); err != nil {
return statementInfo{}, fmt.Errorf("cancel statement: %w", err)
}
return statementInfo{
StatementID: statementID,
State: sql.StatementStateCanceled,
}, nil
}
96 changes: 96 additions & 0 deletions experimental/aitools/cmd/statement_get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package aitools

import (
"context"
"fmt"

"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdctx"
"github.com/databricks/databricks-sdk-go/service/sql"
"github.com/spf13/cobra"
)

func newStatementGetCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "get STATEMENT_ID",
Short: "Block until a previously submitted statement is terminal and emit its result",
Long: `Poll a statement_id until it reaches a terminal state, then emit
columns and rows on success or an error object on failure.

Ctrl+C stops polling but does NOT cancel the server-side statement.
Use 'statement cancel <id>' to terminate explicitly. (This differs from
'tools query', which cancels server-side on Ctrl+C because the user
invoked the synchronous path.)`,
Example: ` databricks experimental aitools tools statement get 01ef...`,
Args: cobra.ExactArgs(1),
PreRunE: root.MustWorkspaceClient,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
w := cmdctx.WorkspaceClient(ctx)
statementID := args[0]

info, err := getStatementResult(ctx, w.StatementExecution, statementID)
if err != nil {
return err
}

if err := renderStatementInfo(cmd.OutOrStdout(), info); err != nil {
return err
}

// Non-zero exit when the statement reached a non-success terminal
// state OR a chunk-fetch failure prevented assembling the rows.
// In both cases the failure detail is already in the JSON output.
if info.State != sql.StatementStateSucceeded || info.Error != nil {
return root.ErrAlreadyPrinted
}
return nil
},
}

return cmd
}

// getStatementResult polls a statement until terminal, then assembles a
// statementInfo with rows on success or an error object on failure.
//
// Context cancellation propagates from pollStatement WITHOUT cancelling the
// server-side statement (intentional: 'get' is a poll-only operation; use
// 'cancel' to terminate explicitly).
func getStatementResult(ctx context.Context, api sql.StatementExecutionInterface, statementID string) (statementInfo, error) {
// Fetch the current state first so pollStatement can short-circuit if
// the statement is already terminal.
resp, err := api.GetStatementByStatementId(ctx, statementID)
if err != nil {
return statementInfo{}, fmt.Errorf("get statement: %w", err)
}

pollResp, err := pollStatement(ctx, api, resp)
if err != nil {
return statementInfo{}, err
}

info := statementInfo{StatementID: pollResp.StatementId}
if pollResp.Status != nil {
info.State = pollResp.Status.State
}
info.Error = statementErrorFromStatus(pollResp.Status)

if info.State == sql.StatementStateSucceeded {
info.Columns = extractColumns(pollResp.Manifest)
rows, err := fetchAllRows(ctx, api, pollResp)
if err != nil {
// The query succeeded server-side but a later chunk fetch failed
// (network blip, throttling, transient 5xx). Surface this as a
// structured error on the same statementInfo so the caller still
// gets a parseable JSON response with the statement_id; RunE then
// signals exit-non-zero based on info.Error.
info.Error = &batchResultError{
Message: fmt.Sprintf("fetch result rows: %v", err),
}
return info, nil
}
info.Rows = rows
}
return info, nil
}
52 changes: 52 additions & 0 deletions experimental/aitools/cmd/statement_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package aitools

import (
"context"
"fmt"

"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdctx"
"github.com/databricks/databricks-sdk-go/service/sql"
"github.com/spf13/cobra"
)

func newStatementStatusCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "status STATEMENT_ID",
Short: "Return the current state of a statement without polling",
Long: `Single GET against the Statements API. Use this to peek at progress
without blocking. For a blocking poll-until-terminal call, use
'statement get'.`,
Example: ` databricks experimental aitools tools statement status 01ef...`,
Args: cobra.ExactArgs(1),
PreRunE: root.MustWorkspaceClient,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
w := cmdctx.WorkspaceClient(ctx)
statementID := args[0]

info, err := getStatementStatus(ctx, w.StatementExecution, statementID)
if err != nil {
return err
}
return renderStatementInfo(cmd.OutOrStdout(), info)
},
}

return cmd
}

// getStatementStatus performs a single GET against the Statements API, no polling.
func getStatementStatus(ctx context.Context, api sql.StatementExecutionInterface, statementID string) (statementInfo, error) {
resp, err := api.GetStatementByStatementId(ctx, statementID)
if err != nil {
return statementInfo{}, fmt.Errorf("get statement: %w", err)
}

info := statementInfo{StatementID: resp.StatementId}
if resp.Status != nil {
info.State = resp.Status.State
}
info.Error = statementErrorFromStatus(resp.Status)
return info, nil
}
Loading