Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/session/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ func getAllMigrations() []Migration {
ALTER TABLE sessions DROP COLUMN split_diff_view;
`,
},
{
ID: 20,
Name: "020_drop_messages_column",
Description: "Drop the legacy messages JSON column now that all data lives in session_items",
UpSQL: `ALTER TABLE sessions DROP COLUMN messages`,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migration robustness: DROP COLUMN without existence check

Migration 020 uses ALTER TABLE sessions DROP COLUMN messages without checking if the column exists. SQLite doesn't support DROP COLUMN IF EXISTS syntax.

While this will work in normal operation (the initial schema creates the messages column, and migrations run sequentially), it could fail in edge cases:

  • Database corruption where the column was manually removed
  • Interrupted or failed previous migrations
  • Manual schema modifications

Consider adding a column existence check before dropping:

-- Check if column exists first
SELECT COUNT(*) FROM pragma_table_info('sessions') WHERE name='messages';
-- Only drop if it exists

Or wrap in a transaction with error handling that ignores "no such column" errors.

This is unlikely to affect normal users but could cause migration failures in edge cases.

},
}
}

Expand Down
91 changes: 1 addition & 90 deletions pkg/session/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,33 +329,6 @@ type SQLiteSessionStore struct {
db *sql.DB
}

// syncMessagesColumn rebuilds the messages JSON column from session_items for backward compatibility.
// This allows older versions of docker agent to read sessions created by newer versions.
func (s *SQLiteSessionStore) syncMessagesColumn(ctx context.Context, sessionID string) error {
return s.syncMessagesColumnWith(ctx, s.db, sessionID)
}

// syncMessagesColumnTx is like syncMessagesColumn but uses an existing transaction.
func (s *SQLiteSessionStore) syncMessagesColumnTx(ctx context.Context, tx *sql.Tx, sessionID string) error {
return s.syncMessagesColumnWith(ctx, tx, sessionID)
}

// syncMessagesColumnWith rebuilds the messages JSON column using the provided querier.
func (s *SQLiteSessionStore) syncMessagesColumnWith(ctx context.Context, q querier, sessionID string) error {
items, err := s.loadSessionItemsWith(ctx, q, sessionID)
if err != nil {
return fmt.Errorf("loading session items: %w", err)
}

messagesJSON, err := json.Marshal(items)
if err != nil {
return fmt.Errorf("marshaling messages: %w", err)
}

_, err = q.ExecContext(ctx, "UPDATE sessions SET messages = ? WHERE id = ?", string(messagesJSON), sessionID)
return err
}

// UpdateSessionTokens updates only token/cost fields.
func (s *InMemorySessionStore) UpdateSessionTokens(_ context.Context, sessionID string, inputTokens, outputTokens int64, cost float64) error {
if sessionID == "" {
Expand Down Expand Up @@ -695,8 +668,6 @@ type sessionItemRow struct {
}

// loadSessionItems loads all items for a session from the session_items table.
// If no items exist in session_items, it falls back to the legacy messages JSON column
// for backward compatibility with sessions created by older docker agent versions.
func (s *SQLiteSessionStore) loadSessionItems(ctx context.Context, sessionID string) ([]Item, error) {
return s.loadSessionItemsWith(ctx, s.db, sessionID)
}
Expand Down Expand Up @@ -727,9 +698,8 @@ func (s *SQLiteSessionStore) loadSessionItemsWith(ctx context.Context, q querier
}
rows.Close()

// If no session_items found, fall back to legacy messages column
if len(rawRows) == 0 {
return s.loadMessagesFromLegacyColumn(ctx, sessionID)
return nil, nil
}

// Now process the collected rows, making recursive calls as needed
Expand Down Expand Up @@ -799,38 +769,6 @@ func (s *SQLiteSessionStore) loadSessionWith(ctx context.Context, q querier, id
return sess, nil
}

// loadMessagesFromLegacyColumn loads messages from the legacy messages JSON column.
// This is used for backward compatibility with sessions created by older docker agent versions
// that haven't been migrated to the session_items table yet.
func (s *SQLiteSessionStore) loadMessagesFromLegacyColumn(ctx context.Context, sessionID string) ([]Item, error) {
var messagesJSON sql.NullString
err := s.db.QueryRowContext(ctx, "SELECT messages FROM sessions WHERE id = ?", sessionID).Scan(&messagesJSON)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
// Handle case where messages column doesn't exist (very old or corrupted database)
// This can happen if the database was created before the messages column was added
// or if migrations failed partially
if sqliteutil.IsNoSuchColumnError(err) {
slog.Warn("messages column not found in sessions table, returning empty messages", "session_id", sessionID)
return nil, nil
}
return nil, err
}

if !messagesJSON.Valid || messagesJSON.String == "" || messagesJSON.String == "[]" {
return nil, nil
}

var items []Item
if err := json.Unmarshal([]byte(messagesJSON.String), &items); err != nil {
return nil, fmt.Errorf("unmarshaling legacy messages: %w", err)
}

return items, nil
}

// GetSessions retrieves all root sessions (excludes sub-sessions)
func (s *SQLiteSessionStore) GetSessions(ctx context.Context) ([]*Session, error) {
rows, err := s.db.QueryContext(ctx,
Expand Down Expand Up @@ -1073,11 +1011,6 @@ func (s *SQLiteSessionStore) AddMessage(ctx context.Context, sessionID string, m
return 0, fmt.Errorf("getting last insert id: %w", err)
}

// Update messages column for backward compatibility with older docker agent versions
if err := s.syncMessagesColumn(ctx, sessionID); err != nil {
slog.Warn("[STORE] Failed to sync messages column", "session_id", sessionID, "error", err)
}

slog.Debug("[STORE] AddMessage", "session_id", sessionID, "message_id", id, "role", msg.Message.Role, "agent", msg.AgentName)
return id, nil
}
Expand Down Expand Up @@ -1105,15 +1038,6 @@ func (s *SQLiteSessionStore) UpdateMessage(ctx context.Context, messageID int64,
return ErrNotFound
}

// Get session ID for this message to sync the messages column
var sessionID string
err = s.db.QueryRowContext(ctx, "SELECT session_id FROM session_items WHERE id = ?", messageID).Scan(&sessionID)
if err == nil {
if syncErr := s.syncMessagesColumn(ctx, sessionID); syncErr != nil {
slog.Warn("[STORE] Failed to sync messages column", "session_id", sessionID, "error", syncErr)
}
}

return nil
}

Expand Down Expand Up @@ -1155,14 +1079,6 @@ func (s *SQLiteSessionStore) AddSubSession(ctx context.Context, parentSessionID
return fmt.Errorf("inserting subsession reference: %w", err)
}

// 5. Update messages column for both parent and sub-session for backward compatibility
if err := s.syncMessagesColumnTx(ctx, tx, parentSessionID); err != nil {
slog.Warn("[STORE] Failed to sync parent messages column", "session_id", parentSessionID, "error", err)
}
if err := s.syncMessagesColumnTx(ctx, tx, subSession.ID); err != nil {
slog.Warn("[STORE] Failed to sync sub-session messages column", "session_id", subSession.ID, "error", err)
}

return tx.Commit()
}

Expand Down Expand Up @@ -1276,11 +1192,6 @@ func (s *SQLiteSessionStore) AddSummary(ctx context.Context, sessionID, summary
return err
}

// Update messages column for backward compatibility with older docker agent versions
if syncErr := s.syncMessagesColumn(ctx, sessionID); syncErr != nil {
slog.Warn("[STORE] Failed to sync messages column", "session_id", sessionID, "error", syncErr)
}

return nil
}

Expand Down
195 changes: 0 additions & 195 deletions pkg/session/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,201 +709,6 @@ func TestBackupDatabase(t *testing.T) {
})
}

// TestBackwardCompatibility_ReadLegacyMessages verifies that new code can read
// sessions that were created by older docker agent versions (messages in JSON column only).
func TestBackwardCompatibility_ReadLegacyMessages(t *testing.T) {
tempDB := filepath.Join(t.TempDir(), "test_legacy.db")

store, err := NewSQLiteSessionStore(tempDB)
require.NoError(t, err)
defer store.(*SQLiteSessionStore).Close()

sqliteStore := store.(*SQLiteSessionStore)

// Simulate a legacy session by inserting directly into the sessions table
// with messages in the JSON column and NO entries in session_items
legacyMessages := []Item{
NewMessageItem(UserMessage("Hello from legacy")),
NewMessageItem(&Message{
AgentName: "test-agent",
Message: chat.Message{
Role: chat.MessageRoleAssistant,
Content: "Hi from legacy agent!",
},
}),
}

legacyMessagesJSON, err := json.Marshal(legacyMessages)
require.NoError(t, err)

_, err = sqliteStore.db.ExecContext(t.Context(),
`INSERT INTO sessions (id, messages, tools_approved, input_tokens, output_tokens, title, cost, send_user_message, max_iterations, working_dir, created_at, starred, permissions, agent_model_overrides, custom_models_used, thinking)
VALUES (?, ?, 0, 0, 0, 'Legacy Session', 0, 1, 0, '', ?, 0, '', '{}', '[]', 1)`,
"legacy-session", string(legacyMessagesJSON), time.Now().Format(time.RFC3339))
require.NoError(t, err)

// Now read the session using the store API - it should fall back to messages column
retrieved, err := store.GetSession(t.Context(), "legacy-session")
require.NoError(t, err)
require.NotNil(t, retrieved)

// Verify messages were read from the legacy column
assert.Len(t, retrieved.Messages, 2)
assert.Equal(t, "Hello from legacy", retrieved.Messages[0].Message.Message.Content)
assert.Equal(t, "test-agent", retrieved.Messages[1].Message.AgentName)
assert.Equal(t, "Hi from legacy agent!", retrieved.Messages[1].Message.Message.Content)
}

// TestForwardCompatibility_MessagesColumnPopulated verifies that new code populates
// the messages column so older docker agent versions can read sessions.
func TestForwardCompatibility_MessagesColumnPopulated(t *testing.T) {
tempDB := filepath.Join(t.TempDir(), "test_forward.db")

store, err := NewSQLiteSessionStore(tempDB)
require.NoError(t, err)
defer store.(*SQLiteSessionStore).Close()

sqliteStore := store.(*SQLiteSessionStore)

// Create a session using the new API
session := &Session{
ID: "new-session",
CreatedAt: time.Now(),
}
err = store.AddSession(t.Context(), session)
require.NoError(t, err)

// Add messages using the new granular API
_, err = store.AddMessage(t.Context(), "new-session", UserMessage("Hello from new code"))
require.NoError(t, err)

_, err = store.AddMessage(t.Context(), "new-session", &Message{
AgentName: "new-agent",
Message: chat.Message{
Role: chat.MessageRoleAssistant,
Content: "Response from new agent",
},
})
require.NoError(t, err)

// Verify messages column is populated (how old docker agent would read it)
var messagesJSON string
err = sqliteStore.db.QueryRowContext(t.Context(),
"SELECT messages FROM sessions WHERE id = ?", "new-session").Scan(&messagesJSON)
require.NoError(t, err)
assert.NotEmpty(t, messagesJSON)
assert.NotEqual(t, "[]", messagesJSON)

// Parse and verify the messages column content
var items []Item
err = json.Unmarshal([]byte(messagesJSON), &items)
require.NoError(t, err)

assert.Len(t, items, 2)
assert.Equal(t, "Hello from new code", items[0].Message.Message.Content)
assert.Equal(t, "new-agent", items[1].Message.AgentName)
assert.Equal(t, "Response from new agent", items[1].Message.Message.Content)
}

// TestForwardCompatibility_SubSessionPopulated verifies that sub-sessions
// are properly serialized to the messages column for backward compatibility.
func TestForwardCompatibility_SubSessionPopulated(t *testing.T) {
tempDB := filepath.Join(t.TempDir(), "test_subsession.db")

store, err := NewSQLiteSessionStore(tempDB)
require.NoError(t, err)
defer store.(*SQLiteSessionStore).Close()

sqliteStore := store.(*SQLiteSessionStore)

// Create parent session
parentSession := &Session{
ID: "parent-session",
CreatedAt: time.Now(),
}
err = store.AddSession(t.Context(), parentSession)
require.NoError(t, err)

// Add a message to parent
_, err = store.AddMessage(t.Context(), "parent-session", UserMessage("Start task"))
require.NoError(t, err)

// Create and add a sub-session
subSession := &Session{
ID: "sub-session",
CreatedAt: time.Now(),
Messages: []Item{
NewMessageItem(UserMessage("Sub task")),
NewMessageItem(&Message{
AgentName: "sub-agent",
Message: chat.Message{
Role: chat.MessageRoleAssistant,
Content: "Sub response",
},
}),
},
}
err = store.AddSubSession(t.Context(), "parent-session", subSession)
require.NoError(t, err)

// Verify parent's messages column contains the sub-session
var messagesJSON string
err = sqliteStore.db.QueryRowContext(t.Context(),
"SELECT messages FROM sessions WHERE id = ?", "parent-session").Scan(&messagesJSON)
require.NoError(t, err)

var items []Item
err = json.Unmarshal([]byte(messagesJSON), &items)
require.NoError(t, err)

assert.Len(t, items, 2) // user message + subsession
assert.Equal(t, "Start task", items[0].Message.Message.Content)
assert.NotNil(t, items[1].SubSession)
assert.Equal(t, "sub-session", items[1].SubSession.ID)
assert.Len(t, items[1].SubSession.Messages, 2)
}

// TestForwardCompatibility_SummaryPopulated verifies that summaries
// are properly serialized to the messages column for backward compatibility.
func TestForwardCompatibility_SummaryPopulated(t *testing.T) {
tempDB := filepath.Join(t.TempDir(), "test_summary.db")

store, err := NewSQLiteSessionStore(tempDB)
require.NoError(t, err)
defer store.(*SQLiteSessionStore).Close()

sqliteStore := store.(*SQLiteSessionStore)

// Create session
session := &Session{
ID: "summary-session",
CreatedAt: time.Now(),
}
err = store.AddSession(t.Context(), session)
require.NoError(t, err)

// Add messages and a summary
_, err = store.AddMessage(t.Context(), "summary-session", UserMessage("Hello"))
require.NoError(t, err)

err = store.AddSummary(t.Context(), "summary-session", "This is a summary of the conversation.")
require.NoError(t, err)

// Verify messages column contains the summary
var messagesJSON string
err = sqliteStore.db.QueryRowContext(t.Context(),
"SELECT messages FROM sessions WHERE id = ?", "summary-session").Scan(&messagesJSON)
require.NoError(t, err)

var items []Item
err = json.Unmarshal([]byte(messagesJSON), &items)
require.NoError(t, err)

assert.Len(t, items, 2)
assert.Equal(t, "Hello", items[0].Message.Message.Content)
assert.Equal(t, "This is a summary of the conversation.", items[1].Summary)
}

// TestOrphanedSubsessionReference verifies that loading sessions gracefully
// handles orphaned subsession references (where the subsession was deleted
// but the reference in session_items remains).
Expand Down
10 changes: 0 additions & 10 deletions pkg/sqliteutil/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,6 @@ func IsCantOpenError(err error) bool {
return false
}

// IsNoSuchColumnError checks if the error is due to a missing column in SQLite.
// This typically happens when querying a column that doesn't exist in the schema.
func IsNoSuchColumnError(err error) bool {
if sqliteErr, ok := errors.AsType[*sqlite.Error](err); ok {
// SQLITE_ERROR (1) is the generic SQL error code used for "no such column"
return sqliteErr.Code() == sqlite3.SQLITE_ERROR
}
return false
}

// DiagnoseDBOpenError provides a more helpful error message when SQLite
// fails to open/create a database file.
func DiagnoseDBOpenError(path string, originalErr error) error {
Expand Down
Loading