Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 189 additions & 12 deletions components/backend/handlers/projects.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"os"
"regexp"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -142,9 +143,22 @@ func GetClusterInfo(c *gin.Context) {
})
}

// accessCheckResult holds the result of a parallel access check
type accessCheckResult struct {
namespace *corev1.Namespace
hasAccess bool
err error
cancelled bool // Context was cancelled before check completed
}

// parallelSSARWorkerCount is the number of concurrent SSAR checks
const parallelSSARWorkerCount = 10

// ListProjects handles GET /projects
// Lists Namespaces (both platforms) using backend SA with label selector,
// then uses SubjectAccessReview to verify user access to each namespace
// then uses SubjectAccessReview to verify user access to each namespace.
// Supports pagination via limit/offset and search filtering.
// SSAR checks are performed in parallel for improved performance.
func ListProjects(c *gin.Context) {
reqK8s, _ := GetK8sClientsForRequest(c)

Expand All @@ -153,16 +167,23 @@ func ListProjects(c *gin.Context) {
return
}

// Parse pagination parameters
var params types.PaginationParams
if err := c.ShouldBindQuery(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid pagination parameters"})
return
}
types.NormalizePaginationParams(&params)

// List namespaces using backend SA (both platforms)
if K8sClientProjects == nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list projects"})
return
}

isOpenShift := isOpenShiftCluster()
projects := []types.AmbientProject{}

ctx, cancel := context.WithTimeout(context.Background(), defaultK8sTimeout)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // Increased timeout for parallel checks
defer cancel()

nsList, err := K8sClientProjects.CoreV1().Namespaces().List(ctx, v1.ListOptions{
Expand All @@ -174,21 +195,177 @@ func ListProjects(c *gin.Context) {
return
}

// Filter to only namespaces where user has access
// Use SubjectAccessReview - checks ALL RBAC sources (any RoleBinding, group, etc.)
for _, ns := range nsList.Items {
hasAccess, err := checkUserCanAccessNamespace(reqK8s, ns.Name)
if err != nil {
log.Printf("Failed to check access for namespace %s: %v", ns.Name, err)
// Pre-filter by search term if provided (before SSAR checks to reduce work)
filteredNamespaces := filterNamespacesBySearch(nsList.Items, params.Search, isOpenShift)

// Perform parallel SSAR checks using worker pool
accessibleProjects := performParallelSSARChecks(ctx, reqK8s, filteredNamespaces, isOpenShift)

// Sort by creation timestamp (newest first)
sortProjectsByCreationTime(accessibleProjects)

// Apply pagination
totalCount := len(accessibleProjects)
paginatedProjects, hasMore, nextOffset := paginateProjects(accessibleProjects, params.Offset, params.Limit)

response := types.PaginatedResponse{
Items: paginatedProjects,
TotalCount: totalCount,
Limit: params.Limit,
Offset: params.Offset,
HasMore: hasMore,
}
if hasMore {
response.NextOffset = &nextOffset
}

c.JSON(http.StatusOK, response)
}

// filterNamespacesBySearch filters namespaces by search term (name or displayName)
func filterNamespacesBySearch(namespaces []corev1.Namespace, search string, isOpenShift bool) []corev1.Namespace {
if search == "" {
return namespaces
}

searchLower := strings.ToLower(search)
filtered := make([]corev1.Namespace, 0, len(namespaces))

for _, ns := range namespaces {
// Match against name
if strings.Contains(strings.ToLower(ns.Name), searchLower) {
filtered = append(filtered, ns)
continue
}

if hasAccess {
projects = append(projects, projectFromNamespace(&ns, isOpenShift))
// On OpenShift, also match against displayName
if isOpenShift && ns.Annotations != nil {
displayName := ns.Annotations["openshift.io/display-name"]
if strings.Contains(strings.ToLower(displayName), searchLower) {
filtered = append(filtered, ns)
continue
}
}
}

c.JSON(http.StatusOK, gin.H{"items": projects})
return filtered
}

// performParallelSSARChecks performs SSAR checks in parallel using a worker pool
func performParallelSSARChecks(ctx context.Context, reqK8s *kubernetes.Clientset, namespaces []corev1.Namespace, isOpenShift bool) []types.AmbientProject {
if len(namespaces) == 0 {
return []types.AmbientProject{}
}

// Determine worker count (don't exceed number of namespaces)
workerCount := parallelSSARWorkerCount
if len(namespaces) < workerCount {
workerCount = len(namespaces)
}

// Channel for namespace work items
workChan := make(chan *corev1.Namespace, len(namespaces))
// Channel for results
resultChan := make(chan accessCheckResult, len(namespaces))

// Start worker goroutines
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for ns := range workChan {
// Check context cancellation
select {
case <-ctx.Done():
// Report cancellation so caller can return partial results
resultChan <- accessCheckResult{
namespace: ns,
cancelled: true,
}
// Drain remaining work items without processing
for range workChan {
resultChan <- accessCheckResult{cancelled: true}
}
return
default:
}

hasAccess, err := checkUserCanAccessNamespace(reqK8s, ns.Name)
resultChan <- accessCheckResult{
namespace: ns,
hasAccess: hasAccess,
err: err,
}
}
}()
}

// Send work items
for i := range namespaces {
workChan <- &namespaces[i]
}
close(workChan)

// Wait for all workers to finish and close result channel
go func() {
wg.Wait()
close(resultChan)
}()

// Collect results and track cancellations
projects := make([]types.AmbientProject, 0, len(namespaces))
cancelledCount := 0
for result := range resultChan {
if result.cancelled {
cancelledCount++
continue
}
if result.err != nil {
log.Printf("Failed to check access for namespace %s: %v", result.namespace.Name, result.err)
continue
}
if result.hasAccess {
projects = append(projects, projectFromNamespace(result.namespace, isOpenShift))
}
}

if cancelledCount > 0 {
log.Printf("Warning: %d SSAR checks were cancelled due to context timeout", cancelledCount)
}

return projects
}

// sortProjectsByCreationTime sorts projects by creation timestamp (newest first)
func sortProjectsByCreationTime(projects []types.AmbientProject) {
// Use sort.Slice for O(n log n) performance
// RFC3339 timestamps sort lexicographically
sort.Slice(projects, func(i, j int) bool {
return projects[i].CreationTimestamp > projects[j].CreationTimestamp
})
}

// paginateProjects applies offset/limit pagination to the project list
func paginateProjects(projects []types.AmbientProject, offset, limit int) ([]types.AmbientProject, bool, int) {
total := len(projects)

// Handle offset beyond available items
if offset >= total {
return []types.AmbientProject{}, false, 0
}

// Calculate end index
end := offset + limit
if end > total {
end = total
}

// Determine if there are more items
hasMore := end < total
nextOffset := end

return projects[offset:end], hasMore, nextOffset
}

// projectFromNamespace converts a Kubernetes Namespace to AmbientProject
Expand Down
117 changes: 115 additions & 2 deletions components/backend/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"os"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -300,7 +301,21 @@ func ListSessions(c *gin.Context) {
_ = reqK8s
gvr := GetAgenticSessionV1Alpha1Resource()

list, err := reqDyn.Resource(gvr).Namespace(project).List(context.TODO(), v1.ListOptions{})
// Parse pagination parameters
var params types.PaginationParams
if err := c.ShouldBindQuery(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid pagination parameters"})
return
}
types.NormalizePaginationParams(&params)

// Build list options with pagination
// Note: Kubernetes List with Limit returns a continue token for server-side pagination
// We use offset-based pagination on top of fetching all items for search/sort flexibility
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

list, err := reqDyn.Resource(gvr).Namespace(project).List(ctx, v1.ListOptions{})
if err != nil {
log.Printf("Failed to list agentic sessions in project %s: %v", project, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list agentic sessions"})
Expand All @@ -326,7 +341,105 @@ func ListSessions(c *gin.Context) {
sessions = append(sessions, session)
}

c.JSON(http.StatusOK, gin.H{"items": sessions})
// Apply search filter if provided
if params.Search != "" {
sessions = filterSessionsBySearch(sessions, params.Search)
}

// Sort by creation timestamp (newest first)
sortSessionsByCreationTime(sessions)

// Apply pagination
totalCount := len(sessions)
paginatedSessions, hasMore, nextOffset := paginateSessions(sessions, params.Offset, params.Limit)

response := types.PaginatedResponse{
Items: paginatedSessions,
TotalCount: totalCount,
Limit: params.Limit,
Offset: params.Offset,
HasMore: hasMore,
}
if hasMore {
response.NextOffset = &nextOffset
}

c.JSON(http.StatusOK, response)
}

// filterSessionsBySearch filters sessions by search term (name or displayName)
func filterSessionsBySearch(sessions []types.AgenticSession, search string) []types.AgenticSession {
if search == "" {
return sessions
}

searchLower := strings.ToLower(search)
filtered := make([]types.AgenticSession, 0, len(sessions))

for _, session := range sessions {
// Match against name
if name, ok := session.Metadata["name"].(string); ok {
if strings.Contains(strings.ToLower(name), searchLower) {
filtered = append(filtered, session)
continue
}
}

// Match against displayName in spec
if strings.Contains(strings.ToLower(session.Spec.DisplayName), searchLower) {
filtered = append(filtered, session)
continue
}

// Match against initialPrompt
if strings.Contains(strings.ToLower(session.Spec.InitialPrompt), searchLower) {
filtered = append(filtered, session)
continue
}
}

return filtered
}

// sortSessionsByCreationTime sorts sessions by creation timestamp (newest first)
func sortSessionsByCreationTime(sessions []types.AgenticSession) {
// Use sort.Slice for O(n log n) performance
sort.Slice(sessions, func(i, j int) bool {
ts1 := getSessionCreationTimestamp(sessions[i])
ts2 := getSessionCreationTimestamp(sessions[j])
// Sort descending (newest first) - RFC3339 timestamps sort lexicographically
return ts1 > ts2
})
}

// getSessionCreationTimestamp extracts the creation timestamp from session metadata
func getSessionCreationTimestamp(session types.AgenticSession) string {
if ts, ok := session.Metadata["creationTimestamp"].(string); ok {
return ts
}
return ""
}

// paginateSessions applies offset/limit pagination to the session list
func paginateSessions(sessions []types.AgenticSession, offset, limit int) ([]types.AgenticSession, bool, int) {
total := len(sessions)

// Handle offset beyond available items
if offset >= total {
return []types.AgenticSession{}, false, 0
}

// Calculate end index
end := offset + limit
if end > total {
end = total
}

// Determine if there are more items
hasMore := end < total
nextOffset := end

return sessions[offset:end], hasMore, nextOffset
}

func CreateSession(c *gin.Context) {
Expand Down
Loading
Loading