diff --git a/.gitignore b/.gitignore index 7ecae2875..2022398b1 100644 --- a/.gitignore +++ b/.gitignore @@ -58,6 +58,7 @@ artifacts/ **/Properties/launchSettings.json !**/*.Host/Properties/launchSettings.json !**/*.AppHost/Properties/launchSettings.json +!memex/aspire/Memex.Database.Migration/Properties/launchSettings.json # StyleCop StyleCopReport.xml @@ -367,8 +368,5 @@ samples/Graph/Data/VUser/ # User activity data **/_useractivity/ -# Claude Code personal settings -.claude/settings.local.json - -# Claude Code scheduled tasks lock file -.claude/scheduled_tasks.lock +# Claude Code per-user local state (settings.local.json, plans/, scheduled_tasks.lock, etc.) +.claude/ diff --git a/MeshWeaver.slnx b/MeshWeaver.slnx index e9a5a7021..ff9776e06 100644 --- a/MeshWeaver.slnx +++ b/MeshWeaver.slnx @@ -143,6 +143,7 @@ + diff --git a/memex/Memex.Portal.Monolith/Program.cs b/memex/Memex.Portal.Monolith/Program.cs index 08c6a1ce2..7327e6da5 100644 --- a/memex/Memex.Portal.Monolith/Program.cs +++ b/memex/Memex.Portal.Monolith/Program.cs @@ -58,7 +58,9 @@ .AddFileSystemDataSource("Cornerstone", "Cornerstone", Path.Combine(graphBasePath, "Cornerstone"), "Sample Cornerstone data") .AddFileSystemDataSource("FutuRe", "FutuRe", - Path.Combine(graphBasePath, "FutuRe"), "Sample FutuRe reinsurance data"); + Path.Combine(graphBasePath, "FutuRe"), "Sample FutuRe reinsurance data") + .AddFileSystemDataSource("SocialMedia", "Social Media", + Path.Combine(graphBasePath, "SocialMedia"), "Social media post planning demo"); } return config.UseMonolithMesh(); diff --git a/memex/Memex.Portal.Shared/Authentication/OAuthCodeStore.cs b/memex/Memex.Portal.Shared/Authentication/OAuthCodeStore.cs new file mode 100644 index 000000000..0e2dadb0f --- /dev/null +++ b/memex/Memex.Portal.Shared/Authentication/OAuthCodeStore.cs @@ -0,0 +1,121 @@ +using System.Collections.Concurrent; +using System.Security.Cryptography; +using System.Text; + +namespace Memex.Portal.Shared.Authentication; + +/// +/// In-memory store for OAuth authorization codes with PKCE support. +/// Codes expire after 5 minutes and are single-use (consumed on exchange). +/// Uses ConcurrentDictionary for thread-safe mutation (per CLAUDE.md exception). +/// +internal class OAuthCodeStore +{ + private readonly ConcurrentDictionary _codes = new(); + private static readonly TimeSpan CodeLifetime = TimeSpan.FromMinutes(5); + + /// + /// Generates a new authorization code and stores it with the given parameters. + /// + public string GenerateCode( + string userId, + string userName, + string userEmail, + string clientId, + string redirectUri, + string? codeChallenge, + string? codeChallengeMethod) + { + // Clean up expired codes opportunistically + CleanupExpired(); + + var code = Convert.ToBase64String(RandomNumberGenerator.GetBytes(32)) + .Replace("+", "-").Replace("/", "_").TrimEnd('='); + + var entry = new AuthorizationCode + { + Code = code, + UserId = userId, + UserName = userName, + UserEmail = userEmail, + ClientId = clientId, + RedirectUri = redirectUri, + CodeChallenge = codeChallenge, + CodeChallengeMethod = codeChallengeMethod, + CreatedAt = DateTimeOffset.UtcNow, + }; + + _codes[code] = entry; + return code; + } + + /// + /// Exchanges an authorization code for the stored entry. + /// Returns null if the code is invalid, expired, or already consumed. + /// Validates PKCE code_verifier if a code_challenge was stored. + /// + public AuthorizationCode? ExchangeCode(string code, string clientId, string redirectUri, string? codeVerifier) + { + if (!_codes.TryRemove(code, out var entry)) + return null; + + // Check expiry + if (DateTimeOffset.UtcNow - entry.CreatedAt > CodeLifetime) + return null; + + // Validate client_id and redirect_uri match + if (!string.Equals(entry.ClientId, clientId, StringComparison.Ordinal)) + return null; + if (!string.Equals(entry.RedirectUri, redirectUri, StringComparison.Ordinal)) + return null; + + // Validate PKCE + if (!string.IsNullOrEmpty(entry.CodeChallenge)) + { + if (string.IsNullOrEmpty(codeVerifier)) + return null; + + if (!VerifyPkce(codeVerifier, entry.CodeChallenge, entry.CodeChallengeMethod)) + return null; + } + + return entry; + } + + private static bool VerifyPkce(string codeVerifier, string codeChallenge, string? method) + { + if (string.Equals(method, "S256", StringComparison.OrdinalIgnoreCase)) + { + var hash = SHA256.HashData(Encoding.ASCII.GetBytes(codeVerifier)); + var computed = Convert.ToBase64String(hash) + .Replace("+", "-").Replace("/", "_").TrimEnd('='); + return string.Equals(computed, codeChallenge, StringComparison.Ordinal); + } + + // plain method (or no method specified) + return string.Equals(codeVerifier, codeChallenge, StringComparison.Ordinal); + } + + private void CleanupExpired() + { + var cutoff = DateTimeOffset.UtcNow - CodeLifetime; + foreach (var kvp in _codes) + { + if (kvp.Value.CreatedAt < cutoff) + _codes.TryRemove(kvp.Key, out _); + } + } +} + +internal record AuthorizationCode +{ + public required string Code { get; init; } + public required string UserId { get; init; } + public required string UserName { get; init; } + public required string UserEmail { get; init; } + public required string ClientId { get; init; } + public required string RedirectUri { get; init; } + public string? CodeChallenge { get; init; } + public string? CodeChallengeMethod { get; init; } + public DateTimeOffset CreatedAt { get; init; } +} diff --git a/memex/Memex.Portal.Shared/Authentication/OAuthConnectController.cs b/memex/Memex.Portal.Shared/Authentication/OAuthConnectController.cs new file mode 100644 index 000000000..c2f12aa05 --- /dev/null +++ b/memex/Memex.Portal.Shared/Authentication/OAuthConnectController.cs @@ -0,0 +1,159 @@ +using System.Security.Claims; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Memex.Portal.Shared.Authentication; + +/// +/// Minimal OAuth 2.0 authorization server for MCP clients (claude.ai Connectors, Claude Desktop). +/// Implements authorization code flow with PKCE. Issues mw_ API tokens as access tokens, +/// reusing the existing ApiTokenService infrastructure. +/// +[ApiController] +public class OAuthConnectController( + IServiceProvider serviceProvider, + ILogger logger) : ControllerBase +{ + private OAuthCodeStore CodeStore => serviceProvider.GetRequiredService(); + private ApiTokenService TokenService => serviceProvider.GetRequiredService(); + + /// + /// RFC 8414 — OAuth Authorization Server Metadata. + /// MCP clients discover this via the authorization_servers URL from the protected resource metadata. + /// + [HttpGet("/.well-known/oauth-authorization-server")] + [AllowAnonymous] + public IActionResult GetServerMetadata() + { + var origin = $"{Request.Scheme}://{Request.Host}"; + return Ok(new + { + issuer = $"{origin}/connect", + authorization_endpoint = $"{origin}/connect/authorize", + token_endpoint = $"{origin}/connect/token", + response_types_supported = new[] { "code" }, + grant_types_supported = new[] { "authorization_code" }, + code_challenge_methods_supported = new[] { "S256" }, + }); + } + + /// + /// OAuth Authorization Endpoint — redirects authenticated users to the client's redirect_uri + /// with an authorization code. Unauthenticated users are sent to /login first. + /// + [HttpGet("connect/authorize")] + public IActionResult Authorize( + [FromQuery] string response_type, + [FromQuery] string client_id, + [FromQuery] string redirect_uri, + [FromQuery] string? state, + [FromQuery] string? scope, + [FromQuery] string? code_challenge, + [FromQuery] string? code_challenge_method) + { + if (response_type != "code") + return BadRequest(new { error = "unsupported_response_type" }); + + if (string.IsNullOrEmpty(client_id) || string.IsNullOrEmpty(redirect_uri)) + return BadRequest(new { error = "invalid_request", error_description = "client_id and redirect_uri are required" }); + + // If user is not authenticated, redirect to login with return URL + if (User?.Identity?.IsAuthenticated != true) + { + var authorizeUrl = $"{Request.Scheme}://{Request.Host}{Request.Path}{Request.QueryString}"; + var loginUrl = $"/login?returnUrl={Uri.EscapeDataString(authorizeUrl)}"; + return Redirect(loginUrl); + } + + // Extract user identity from cookie claims + var email = User.FindFirstValue(ClaimTypes.Email) + ?? User.FindFirstValue("email") + ?? User.FindFirstValue("preferred_username") + ?? ""; + var name = User.FindFirstValue(ClaimTypes.Name) + ?? User.FindFirstValue("name") + ?? email; + var userId = User.FindFirstValue("preferred_username") + ?? email; + + if (string.IsNullOrEmpty(email)) + return BadRequest(new { error = "invalid_request", error_description = "Unable to determine user identity" }); + + // Generate authorization code + var code = CodeStore.GenerateCode( + userId: userId, + userName: name, + userEmail: email, + clientId: client_id, + redirectUri: redirect_uri, + codeChallenge: code_challenge, + codeChallengeMethod: code_challenge_method); + + logger.LogInformation("Issued OAuth authorization code for user {Email}, client {ClientId}", email, client_id); + + // Redirect to client with code (and state if provided) + var callbackUrl = string.IsNullOrEmpty(state) + ? $"{redirect_uri}?code={Uri.EscapeDataString(code)}" + : $"{redirect_uri}?code={Uri.EscapeDataString(code)}&state={Uri.EscapeDataString(state)}"; + + return Redirect(callbackUrl); + } + + /// + /// OAuth Token Endpoint — exchanges an authorization code for an API token. + /// The issued token is a standard mw_ API token, indistinguishable from manually created ones. + /// + [HttpPost("connect/token")] + [AllowAnonymous] + public async Task ExchangeToken([FromForm] TokenRequest request) + { + if (request.grant_type != "authorization_code") + return BadRequest(new { error = "unsupported_grant_type" }); + + if (string.IsNullOrEmpty(request.code) || string.IsNullOrEmpty(request.client_id) || string.IsNullOrEmpty(request.redirect_uri)) + return BadRequest(new { error = "invalid_request" }); + + var entry = CodeStore.ExchangeCode( + request.code, + request.client_id, + request.redirect_uri, + request.code_verifier); + + if (entry == null) + { + logger.LogWarning("OAuth token exchange failed: invalid or expired code for client {ClientId}", request.client_id); + return BadRequest(new { error = "invalid_grant" }); + } + + // Create an mw_ API token via the existing token service + var (rawToken, _) = await TokenService.CreateTokenAsync( + userId: entry.UserId, + userName: entry.UserName, + userEmail: entry.UserEmail, + label: $"OAuth: {request.client_id}", + expiresAt: DateTimeOffset.UtcNow.AddDays(30)); + + logger.LogInformation("Issued OAuth access token for user {Email}, client {ClientId}", entry.UserEmail, request.client_id); + + return Ok(new + { + access_token = rawToken, + token_type = "Bearer", + expires_in = (int)TimeSpan.FromDays(30).TotalSeconds, + }); + } +} + +/// +/// Binds the form-encoded token request body. +/// +public class TokenRequest +{ + public string grant_type { get; set; } = ""; + public string? code { get; set; } + public string? client_id { get; set; } + public string? redirect_uri { get; set; } + public string? code_verifier { get; set; } +} diff --git a/memex/Memex.Portal.Shared/Memex.Portal.Shared.csproj b/memex/Memex.Portal.Shared/Memex.Portal.Shared.csproj index 271494c04..74f5b3aa5 100644 --- a/memex/Memex.Portal.Shared/Memex.Portal.Shared.csproj +++ b/memex/Memex.Portal.Shared/Memex.Portal.Shared.csproj @@ -31,6 +31,7 @@ + diff --git a/memex/Memex.Portal.Shared/MemexConfiguration.cs b/memex/Memex.Portal.Shared/MemexConfiguration.cs index 267713855..83fcaef1e 100644 --- a/memex/Memex.Portal.Shared/MemexConfiguration.cs +++ b/memex/Memex.Portal.Shared/MemexConfiguration.cs @@ -42,6 +42,8 @@ using Microsoft.Extensions.Logging; using Microsoft.Identity.Web; using Microsoft.Identity.Web.UI; +using ModelContextProtocol.AspNetCore.Authentication; +using ModelContextProtocol.Authentication; using PortalAuthOptions = MeshWeaver.Blazor.Portal.Authentication.AuthenticationOptions; namespace Memex.Portal.Shared; @@ -117,8 +119,9 @@ public static void ConfigureMemexServices(this WebApplicationBuilder builder) services.Configure( builder.Configuration.GetSection("Styles")); - // Register API token service for MCP bearer auth + // Register API token service for MCP bearer auth and OAuth code store services.AddSingleton(); + services.AddSingleton(); // Configure authentication var authSection = builder.Configuration.GetSection(PortalAuthOptions.SectionName); @@ -163,7 +166,8 @@ public static void ConfigureMemexServices(this WebApplicationBuilder builder) .AddMicrosoftIdentityWebApp(entraIdConfig); services.AddAuthentication() .AddScheme( - ApiTokenAuthenticationHandler.SchemeName, _ => { }); + ApiTokenAuthenticationHandler.SchemeName, _ => { }) + .AddMcp(ConfigureMcpResourceMetadata); services.AddControllersWithViews() .AddMicrosoftIdentityUI(); } @@ -196,20 +200,67 @@ public static void ConfigureMemexServices(this WebApplicationBuilder builder) // Add API token auth scheme for MCP bearer authentication authBuilder.AddScheme( - ApiTokenAuthenticationHandler.SchemeName, _ => { }); + ApiTokenAuthenticationHandler.SchemeName, _ => { }) + .AddMcp(ConfigureMcpResourceMetadata); } - // Add authorization with McpAuth policy (ApiToken scheme only — no cookie redirects for API clients) + // Add authorization with McpAuth policy (MCP scheme forwards to ApiToken or Cookie) services.AddAuthorization(options => { options.AddPolicy("McpAuth", policy => { - policy.AddAuthenticationSchemes(ApiTokenAuthenticationHandler.SchemeName); + policy.AddAuthenticationSchemes(McpAuthenticationDefaults.AuthenticationScheme); policy.RequireAuthenticatedUser(); }); }); } + /// + /// Configures the MCP authentication scheme with OAuth resource metadata discovery + /// and request-based forwarding to the appropriate authentication handler. + /// + private static void ConfigureMcpResourceMetadata(McpAuthenticationOptions options) + { + // CRITICAL: SDK constructor sets ForwardAuthenticate = "Bearer" which takes + // priority over ForwardDefaultSelector in ASP.NET Core's ResolveTarget(). + // Clear it so our selector works. + options.ForwardAuthenticate = null; + + // Route Bearer tokens to ApiToken handler, everything else to Cookie + options.ForwardDefaultSelector = ctx => + { + var authHeader = ctx.Request.Headers.Authorization.ToString(); + if (!string.IsNullOrEmpty(authHeader) && + authHeader.StartsWith("Bearer ", StringComparison.OrdinalIgnoreCase)) + return ApiTokenAuthenticationHandler.SchemeName; + return CookieAuthenticationDefaults.AuthenticationScheme; + }; + + // Fallback resource metadata (overridden per-request by Events) + options.ResourceMetadata = new ProtectedResourceMetadata + { + BearerMethodsSupported = { "header" }, + ScopesSupported = { "mcp" }, + }; + + options.Events = new McpAuthenticationEvents + { + OnResourceMetadataRequest = ctx => + { + var req = ctx.HttpContext.Request; + var origin = $"{req.Scheme}://{req.Host}"; + ctx.ResourceMetadata = new ProtectedResourceMetadata + { + Resource = $"{origin}/mcp", + BearerMethodsSupported = { "header" }, + ScopesSupported = { "mcp" }, + AuthorizationServers = { $"{origin}/connect" }, + }; + return Task.CompletedTask; + } + }; + } + extension(TBuilder builder) where TBuilder : MeshBuilder { /// @@ -330,7 +381,11 @@ public TBuilder ConfigureMemexMesh(IConfiguration configuration, bool isDevelopm config = config.AddContentCollection(_ => nodeContentConfig); } - return config.AddDefaultLayoutAreas().AddThreadsLayoutArea().AddApiTokensSettingsTab(); + return config + .WithHeartBeatHandler() // silently ack heartbeats on every per-node hub + .AddDefaultLayoutAreas() + .AddThreadsLayoutArea() + .AddApiTokensSettingsTab(); }) // Add activity tracking to record user access patterns via ActivityLogBundler .AddActivityTracking(); diff --git a/memex/Memex.Portal.Shared/Pages/Onboarding.razor b/memex/Memex.Portal.Shared/Pages/Onboarding.razor index 7036bf697..96b5ebab1 100644 --- a/memex/Memex.Portal.Shared/Pages/Onboarding.razor +++ b/memex/Memex.Portal.Shared/Pages/Onboarding.razor @@ -134,6 +134,7 @@ Email = model.Email.Trim(), Bio = string.IsNullOrWhiteSpace(model.Bio) ? null : model.Bio.Trim(), Role = string.IsNullOrWhiteSpace(model.Role) ? null : model.Role.Trim(), + PinnedPaths = ["Doc"], }; // Use ImpersonateAsHub so the portal hub identity is recognized diff --git a/memex/aspire/Memex.AppHost/Program.cs b/memex/aspire/Memex.AppHost/Program.cs index 674ff47ad..4c536b79a 100644 --- a/memex/aspire/Memex.AppHost/Program.cs +++ b/memex/aspire/Memex.AppHost/Program.cs @@ -137,10 +137,10 @@ .WithEnvironment("Anthropic__Endpoint", "https://s-meshweaver.services.ai.azure.com/anthropic/") .WithEnvironment("Anthropic__ApiKey", azureFoundryKey) .WithEnvironment("Anthropic__Models__0", "claude-sonnet-4-6") - .WithEnvironment("Anthropic__Models__1", "claude-opus-4-6") + .WithEnvironment("Anthropic__Models__1", "claude-opus-4-7") .WithEnvironment("Anthropic__Models__2", "claude-haiku-4-5") // Model tiers: map agent tiers to concrete models - .WithEnvironment("ModelTier__Heavy", "claude-opus-4-6") + .WithEnvironment("ModelTier__Heavy", "claude-opus-4-7") .WithEnvironment("ModelTier__Standard", "claude-sonnet-4-6") .WithEnvironment("ModelTier__Light", "claude-haiku-4-5") // LLM: Azure OpenAI diff --git a/memex/aspire/Memex.Database.Migration/Properties/launchSettings.json b/memex/aspire/Memex.Database.Migration/Properties/launchSettings.json new file mode 100644 index 000000000..ad7baef75 --- /dev/null +++ b/memex/aspire/Memex.Database.Migration/Properties/launchSettings.json @@ -0,0 +1,11 @@ +{ + "profiles": { + "Migration": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "DOTNET_ENVIRONMENT": "Development" + } + } + } +} diff --git a/memex/aspire/Memex.Portal.Distributed/appsettings.Development.json b/memex/aspire/Memex.Portal.Distributed/appsettings.Development.json index 6e539eb49..a1139b132 100644 --- a/memex/aspire/Memex.Portal.Distributed/appsettings.Development.json +++ b/memex/aspire/Memex.Portal.Distributed/appsettings.Development.json @@ -4,8 +4,9 @@ "LogLevel": { "Default": "Warning", "Microsoft.AspNetCore": "Warning", - "MeshWeaver": "Warning", + "MeshWeaver": "Information", "MeshWeaver.AI": "Information", + "MeshWeaver.Graph.Configuration": "Information", "MeshWeaver.Layout.ConvertJson": "Warning", "MeshWeaver.Messaging.Hub.MessageHub": "Warning", "Azure.Core": "Warning", diff --git a/samples/Graph/Data/SocialMedia/Post.json b/samples/Graph/Data/SocialMedia/Post.json new file mode 100644 index 000000000..ae79018b4 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Post.json @@ -0,0 +1,19 @@ +{ + "id": "Post", + "namespace": "SocialMedia", + "name": "Social Media Post", + "nodeType": "NodeType", + "category": "Types", + "description": "A social media post (scheduled, published, with engagement stats)", + "icon": "/static/NodeTypeIcons/document.svg", + "isPersistent": true, + "content": { + "$type": "NodeTypeDefinition", + "id": "Post", + "namespace": "SocialMedia", + "displayName": "Social Media Post", + "iconName": "DocumentText", + "description": "A social media post (scheduled, published, with engagement stats)", + "configuration": "config => config.WithContentType().AddData(data => data.AddSource(source => source.WithType(t => t.WithInitialData(Platform.All)))).AddDefaultLayoutAreas().AddLayout(layout => layout.AddSocialMediaPostLayoutAreas().WithDefaultArea(\"Calendar\"))" + } +} diff --git a/samples/Graph/Data/SocialMedia/Post/Post-001.json b/samples/Graph/Data/SocialMedia/Post/Post-001.json new file mode 100644 index 000000000..c6fc7e1d3 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Post/Post-001.json @@ -0,0 +1,22 @@ +{ + "id": "Post-001", + "namespace": "SocialMedia/Post", + "path": "SocialMedia/Post/Post-001", + "name": "Why we bet on the actor model", + "nodeType": "SocialMedia/Post", + "icon": "/static/NodeTypeIcons/document.svg", + "isPersistent": true, + "content": { + "$type": "SocialMediaPost", + "title": "Why we bet on the actor model", + "body": "Reactive systems live or die on isolation. After years of fighting threadpools, we leaned into the actor model with Orleans \u2014 and never looked back.\n\nWhat surprised us most? **The debugging story is dramatically better.**", + "profilePath": "SocialMedia/Profile/Roland-LinkedIn", + "platform": "LinkedIn", + "scheduledAt": "2026-04-05T09:00:00+02:00", + "publishedAt": "2026-04-05T09:01:42+02:00", + "impressions": 4321, + "likes": 187, + "comments": 24, + "mediaUrl": "https://picsum.photos/seed/meshweaver-actors/800/450" + } +} diff --git a/samples/Graph/Data/SocialMedia/Post/Post-002.json b/samples/Graph/Data/SocialMedia/Post/Post-002.json new file mode 100644 index 000000000..7c4121811 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Post/Post-002.json @@ -0,0 +1,21 @@ +{ + "id": "Post-002", + "namespace": "SocialMedia/Post", + "path": "SocialMedia/Post/Post-002", + "name": "Three rules for building reactive UIs that scale", + "nodeType": "SocialMedia/Post", + "icon": "/static/NodeTypeIcons/document.svg", + "isPersistent": true, + "content": { + "$type": "SocialMediaPost", + "title": "Three rules for building reactive UIs that scale", + "body": "1. Never await in a hub handler.\n2. State changes flow through immutable streams.\n3. Click handlers are projections, not orchestrations.\n\nThis took us a while to internalize \u2014 worth a thread.", + "profilePath": "SocialMedia/Profile/Roland-LinkedIn", + "platform": "LinkedIn", + "scheduledAt": "2026-04-12T08:30:00+02:00", + "publishedAt": "2026-04-12T08:31:18+02:00", + "impressions": 6890, + "likes": 312, + "comments": 41 + } +} diff --git a/samples/Graph/Data/SocialMedia/Post/Post-003.json b/samples/Graph/Data/SocialMedia/Post/Post-003.json new file mode 100644 index 000000000..6bc2572eb --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Post/Post-003.json @@ -0,0 +1,22 @@ +{ + "id": "Post-003", + "namespace": "SocialMedia/Post", + "path": "SocialMedia/Post/Post-003", + "name": "Customer story: how Northwind cut reporting latency 10x", + "nodeType": "SocialMedia/Post", + "icon": "/static/NodeTypeIcons/document.svg", + "isPersistent": true, + "content": { + "$type": "SocialMediaPost", + "title": "Customer story: how Northwind cut reporting latency 10x", + "body": "When the analytics team at Northwind moved their pricing dashboards onto MeshWeaver, page renders dropped from 4.1s to 380ms.\n\nFull case study in comments.", + "profilePath": "SocialMedia/Profile/Sarah-LinkedIn", + "platform": "LinkedIn", + "scheduledAt": "2026-04-08T14:00:00+02:00", + "publishedAt": "2026-04-08T14:00:51+02:00", + "impressions": 9120, + "likes": 421, + "comments": 67, + "mediaUrl": "https://picsum.photos/seed/northwind-case/800/450" + } +} diff --git a/samples/Graph/Data/SocialMedia/Post/Post-004.json b/samples/Graph/Data/SocialMedia/Post/Post-004.json new file mode 100644 index 000000000..23b217175 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Post/Post-004.json @@ -0,0 +1,20 @@ +{ + "id": "Post-004", + "namespace": "SocialMedia/Post", + "path": "SocialMedia/Post/Post-004", + "name": "Live demo Thursday: agentic data exploration", + "nodeType": "SocialMedia/Post", + "icon": "/static/NodeTypeIcons/document.svg", + "isPersistent": true, + "content": { + "$type": "SocialMediaPost", + "title": "Live demo Thursday: agentic data exploration", + "body": "Join us this Thursday for a 30-minute live demo of our new Navigator agent. Bring questions \u2014 we'll answer them on a real dataset.\n\nRegister via the link in comments.", + "profilePath": "SocialMedia/Profile/Roland-LinkedIn", + "platform": "LinkedIn", + "scheduledAt": "2026-04-22T17:00:00+02:00", + "impressions": 0, + "likes": 0, + "comments": 0 + } +} diff --git a/samples/Graph/Data/SocialMedia/Post/Post-005.json b/samples/Graph/Data/SocialMedia/Post/Post-005.json new file mode 100644 index 000000000..a37ab25da --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Post/Post-005.json @@ -0,0 +1,20 @@ +{ + "id": "Post-005", + "namespace": "SocialMedia/Post", + "path": "SocialMedia/Post/Post-005", + "name": "We're hiring: Senior Backend Engineer", + "nodeType": "SocialMedia/Post", + "icon": "/static/NodeTypeIcons/document.svg", + "isPersistent": true, + "content": { + "$type": "SocialMediaPost", + "title": "We're hiring: Senior Backend Engineer", + "body": "We are growing the platform team. Looking for someone who loves distributed systems, reactive design and shipping frequently.\n\nDM me or apply via the careers page.", + "profilePath": "SocialMedia/Profile/Sarah-LinkedIn", + "platform": "LinkedIn", + "scheduledAt": "2026-04-25T10:00:00+02:00", + "impressions": 0, + "likes": 0, + "comments": 0 + } +} diff --git a/samples/Graph/Data/SocialMedia/Post/Post-006.json b/samples/Graph/Data/SocialMedia/Post/Post-006.json new file mode 100644 index 000000000..b356aec45 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Post/Post-006.json @@ -0,0 +1,20 @@ +{ + "id": "Post-006", + "namespace": "SocialMedia/Post", + "path": "SocialMedia/Post/Post-006", + "name": "April recap: what shipped and what's next", + "nodeType": "SocialMedia/Post", + "icon": "/static/NodeTypeIcons/document.svg", + "isPersistent": true, + "content": { + "$type": "SocialMediaPost", + "title": "April recap: what shipped and what's next", + "body": "A quick look back at April:\n\n- Annotation system: \u2705\n- MCP OAuth discovery: \u2705\n- Aggregating providers: \u2705\n\nNext month we open up the social media planner publicly. Stay tuned.", + "profilePath": "SocialMedia/Profile/Roland-LinkedIn", + "platform": "LinkedIn", + "scheduledAt": "2026-04-29T08:00:00+02:00", + "impressions": 0, + "likes": 0, + "comments": 0 + } +} diff --git a/samples/Graph/Data/SocialMedia/Post/_Source/Platform.cs b/samples/Graph/Data/SocialMedia/Post/_Source/Platform.cs new file mode 100644 index 000000000..ae7655974 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Post/_Source/Platform.cs @@ -0,0 +1,39 @@ +// +// Id: Platform +// DisplayName: Social Media Platform +// + +public record Platform +{ + [Key] + public string Id { get; init; } = string.Empty; + + [Required] + public string Name { get; init; } = string.Empty; + + public string Emoji { get; init; } = string.Empty; + + public string Color { get; init; } = "#0a66c2"; + + public int Order { get; init; } + + public static readonly Platform LinkedIn = new() + { + Id = "LinkedIn", Name = "LinkedIn", Emoji = "\ud83d\udcbc", Color = "#0a66c2", Order = 0 + }; + + public static readonly Platform Twitter = new() + { + Id = "Twitter", Name = "X / Twitter", Emoji = "\ud83d\udc26", Color = "#000000", Order = 1 + }; + + public static readonly Platform Instagram = new() + { + Id = "Instagram", Name = "Instagram", Emoji = "\ud83d\udcf7", Color = "#e1306c", Order = 2 + }; + + public static readonly Platform[] All = [LinkedIn, Twitter, Instagram]; + + public static Platform GetById(string? id) => + All.FirstOrDefault(p => p.Id == id) ?? LinkedIn; +} diff --git a/samples/Graph/Data/SocialMedia/Post/_Source/SocialMediaPost.cs b/samples/Graph/Data/SocialMedia/Post/_Source/SocialMediaPost.cs new file mode 100644 index 000000000..223b048a8 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Post/_Source/SocialMediaPost.cs @@ -0,0 +1,40 @@ +// +// Id: SocialMediaPost +// DisplayName: Social Media Post +// + +using MeshWeaver.Domain; + +public record SocialMediaPost +{ + [Required] + [MeshNodeProperty(nameof(MeshNode.Name))] + [UiControl(Style = "width: 100%;")] + public string Title { get; init; } = string.Empty; + + [Markdown(EditorHeight = "200px")] + public string? Body { get; init; } + + [Required] + [DisplayName("Profile path")] + public string ProfilePath { get; init; } = string.Empty; + + [Dimension] + [UiControl(Style = "width: 200px;")] + public string Platform { get; init; } = "LinkedIn"; + + [DisplayName("Scheduled at")] + public DateTimeOffset? ScheduledAt { get; init; } + + [DisplayName("Published at")] + public DateTimeOffset? PublishedAt { get; init; } + + public int Impressions { get; init; } + + public int Likes { get; init; } + + public int Comments { get; init; } + + [DisplayName("Media URL")] + public string? MediaUrl { get; init; } +} diff --git a/samples/Graph/Data/SocialMedia/Post/_Source/SocialMediaPostLayoutAreas.cs b/samples/Graph/Data/SocialMedia/Post/_Source/SocialMediaPostLayoutAreas.cs new file mode 100644 index 000000000..39f169598 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Post/_Source/SocialMediaPostLayoutAreas.cs @@ -0,0 +1,327 @@ +// +// Id: SocialMediaPostLayoutAreas +// DisplayName: Social Media Post Views +// + +using System.Collections.Immutable; +using System.Globalization; +using System.Text; +using System.Text.Json; +using System.Web; +using MeshWeaver.Layout.Composition; +using MeshWeaver.Mesh.Services; + +public static class SocialMediaPostLayoutAreas +{ + public const string CalendarArea = "Calendar"; + public const string DetailArea = "Detail"; + + public static LayoutDefinition AddSocialMediaPostLayoutAreas(this LayoutDefinition layout) => + layout + .WithView(CalendarArea, Calendar) + .WithView(DetailArea, Detail); + + private static Dictionary ApplyChanges( + Dictionary current, QueryResultChange change) + { + var result = change.ChangeType == QueryChangeType.Initial || change.ChangeType == QueryChangeType.Reset + ? new Dictionary(StringComparer.OrdinalIgnoreCase) + : new Dictionary(current, StringComparer.OrdinalIgnoreCase); + foreach (var item in change.Items) + { + if (change.ChangeType == QueryChangeType.Removed) result.Remove(item.Path); + else result[item.Path] = item; + } + return result; + } + + private static string? GetProp(MeshNode node, string prop) + { + if (node.Content is not JsonElement json) return null; + if (json.TryGetProperty(prop, out var p) && p.ValueKind == JsonValueKind.String) return p.GetString(); + var pascal = char.ToUpperInvariant(prop[0]) + prop.Substring(1); + return json.TryGetProperty(pascal, out var pp) && pp.ValueKind == JsonValueKind.String ? pp.GetString() : null; + } + + private static int GetInt(MeshNode node, string prop) + { + if (node.Content is not JsonElement json) return 0; + var name = prop; + if (!json.TryGetProperty(name, out var p)) + { + name = char.ToUpperInvariant(prop[0]) + prop.Substring(1); + if (!json.TryGetProperty(name, out p)) return 0; + } + return p.ValueKind == JsonValueKind.Number && p.TryGetInt32(out var v) ? v : 0; + } + + private static DateTimeOffset? GetDate(MeshNode node, string prop) + { + if (node.Content is not JsonElement json) return null; + var name = prop; + if (!json.TryGetProperty(name, out var p)) + { + name = char.ToUpperInvariant(prop[0]) + prop.Substring(1); + if (!json.TryGetProperty(name, out p)) return null; + } + return p.ValueKind == JsonValueKind.String && DateTimeOffset.TryParse(p.GetString(), out var dt) ? dt : null; + } + + private const string FilterMy = "my"; + private const string FilterAll = "all"; + + public static IObservable Calendar(LayoutAreaHost host, RenderingContext _) + { + var meshService = host.Hub.ServiceProvider.GetRequiredService(); + var hubAddress = host.Hub.Address; + var currentEmail = host.Hub.ServiceProvider.GetService()?.Context?.Email ?? ""; + + var idStr = host.Reference.Id?.ToString() ?? ""; + var monthPart = idStr.Split('?')[0]; + var month = TryParseMonth(monthPart, out var parsed) ? parsed : new DateTime(DateTime.Today.Year, DateTime.Today.Month, 1); + var filter = host.Reference.GetParameterValue("profile") ?? FilterMy; + + var posts = meshService + .ObserveQuery(MeshQueryRequest.FromQuery("namespace:SocialMedia/Post")) + .Scan(new Dictionary(StringComparer.OrdinalIgnoreCase), ApplyChanges); + var profiles = meshService + .ObserveQuery(MeshQueryRequest.FromQuery("namespace:SocialMedia/Profile")) + .Scan(new Dictionary(StringComparer.OrdinalIgnoreCase), ApplyChanges); + + return posts.CombineLatest(profiles, (postDict, profileDict) => + (UiControl?)BuildCalendar(hubAddress, month, filter, currentEmail, postDict.Values.ToImmutableList(), profileDict.Values.ToImmutableList())); + } + + private static UiControl BuildCalendar( + object hubAddress, DateTime month, string filter, string currentEmail, + ImmutableList allPosts, ImmutableList allProfiles) + { + var profilesByPath = allProfiles.ToImmutableDictionary(p => p.Path, p => p, StringComparer.OrdinalIgnoreCase); + var myProfilePaths = allProfiles + .Where(p => string.Equals(GetProp(p, "owner"), currentEmail, StringComparison.OrdinalIgnoreCase)) + .Select(p => p.Path) + .ToImmutableHashSet(StringComparer.OrdinalIgnoreCase); + + bool MatchesFilter(MeshNode post) + { + var profilePath = GetProp(post, "profilePath") ?? ""; + return filter switch + { + FilterAll => true, + FilterMy => myProfilePaths.Contains(profilePath), + _ => string.Equals(profilePath, filter, StringComparison.OrdinalIgnoreCase) + || string.Equals(profilePath.Split('/').Last(), filter, StringComparison.OrdinalIgnoreCase) + }; + } + + var monthPosts = allPosts + .Where(p => GetDate(p, "scheduledAt") is { } d && d.Year == month.Year && d.Month == month.Month) + .Where(MatchesFilter) + .OrderBy(p => GetDate(p, "scheduledAt")) + .ToImmutableList(); + + var prev = month.AddMonths(-1); + var next = month.AddMonths(1); + var prevHref = BuildHref(hubAddress, prev, filter); + var nextHref = BuildHref(hubAddress, next, filter); + + var toolbar = Controls.Stack + .WithOrientation(Orientation.Horizontal) + .WithStyle("align-items: center; gap: 12px; flex-wrap: wrap; padding: 8px 0;") + .WithView(Controls.Button("\u2039") + .WithAppearance(Appearance.Outline) + .WithNavigateToHref(prevHref)) + .WithView(Controls.Html($"

{HttpUtility.HtmlEncode(month.ToString("MMMM yyyy", CultureInfo.InvariantCulture))}

")) + .WithView(Controls.Button("\u203a") + .WithAppearance(Appearance.Outline) + .WithNavigateToHref(nextHref)) + .WithView(Controls.Html("
")) + .WithView(FilterButton(hubAddress, month, "My profiles", FilterMy, filter)) + .WithView(FilterButton(hubAddress, month, "All", FilterAll, filter)); + + foreach (var profile in allProfiles.OrderBy(p => p.Name)) + { + var label = profile.Name ?? profile.Path; + toolbar = toolbar.WithView(FilterButton(hubAddress, month, label, profile.Path, filter)); + } + + var grid = Controls.Html(BuildMonthGridHtml(month, monthPosts, profilesByPath)); + + var emptyHint = monthPosts.Count == 0 + ? Controls.Markdown($"*No posts scheduled in {month:MMMM yyyy} for this filter.*") + : null; + + var stack = Controls.Stack + .WithStyle("padding: 16px; gap: 12px;") + .WithView(toolbar) + .WithView(grid); + if (emptyHint != null) + stack = stack.WithView(emptyHint); + return stack; + } + + private static ButtonControl FilterButton(object hubAddress, DateTime month, string label, string filterValue, string activeFilter) + { + var isActive = string.Equals(filterValue, activeFilter, StringComparison.OrdinalIgnoreCase); + var btn = Controls.Button(label) + .WithAppearance(isActive ? Appearance.Accent : Appearance.Stealth) + .WithNavigateToHref(BuildHref(hubAddress, month, filterValue)); + return btn; + } + + private static string BuildHref(object hubAddress, DateTime month, string filter) + { + var id = $"{month:yyyy-MM}"; + if (!string.Equals(filter, FilterMy, StringComparison.OrdinalIgnoreCase)) + id += $"?profile={Uri.EscapeDataString(filter)}"; + return new LayoutAreaReference(CalendarArea) { Id = id }.ToHref(hubAddress.ToString()!); + } + + private static string BuildMonthGridHtml( + DateTime month, + ImmutableList monthPosts, + ImmutableDictionary profilesByPath) + { + var firstOfMonth = new DateTime(month.Year, month.Month, 1); + // Monday = 1, Sunday = 0; we want week to start Monday + var dayOffset = ((int)firstOfMonth.DayOfWeek + 6) % 7; + var gridStart = firstOfMonth.AddDays(-dayOffset); + var daysInMonth = DateTime.DaysInMonth(month.Year, month.Month); + + var postsByDay = monthPosts + .GroupBy(p => GetDate(p, "scheduledAt")!.Value.Date) + .ToImmutableDictionary(g => g.Key, g => g.ToImmutableList()); + + var sb = new StringBuilder(); + sb.Append("
"); + + // Day-of-week header + string[] dayNames = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]; + foreach (var d in dayNames) + sb.Append($"
{d}
"); + + var today = DateTime.Today; + for (var i = 0; i < 42; i++) + { + var date = gridStart.AddDays(i); + var isCurrentMonth = date.Month == month.Month && date.Year == month.Year; + var isToday = date == today; + var bg = isCurrentMonth ? "#ffffff" : "#f7f7f7"; + var fg = isCurrentMonth ? "#222" : "#aaa"; + var border = isToday ? "2px solid var(--accent-fill-rest, #0a66c2)" : "1px solid #e5e5e5"; + + sb.Append($"
"); + sb.Append($"
{date.Day}
"); + + if (postsByDay.TryGetValue(date, out var dayPosts)) + { + foreach (var post in dayPosts.Take(3)) + { + var title = post.Name ?? GetProp(post, "title") ?? "(untitled)"; + var profilePath = GetProp(post, "profilePath") ?? ""; + var profile = profilesByPath.GetValueOrDefault(profilePath); + var platformId = GetProp(post, "platform") ?? GetProp(profile ?? new MeshNode(""), "platform") ?? "LinkedIn"; + var platform = Platform.GetById(platformId); + var isPublished = GetDate(post, "publishedAt") is not null; + var icon = isPublished ? "\u2705" : "\ud83d\udcc5"; + var href = "/" + post.Path; + sb.Append($"{icon} {HttpUtility.HtmlEncode(Truncate(title, 22))}"); + } + if (dayPosts.Count > 3) + sb.Append($"
+{dayPosts.Count - 3} more
"); + } + sb.Append("
"); + } + + sb.Append("
"); + return sb.ToString(); + } + + private static string Truncate(string s, int max) => + s.Length <= max ? s : s.Substring(0, max - 1) + "\u2026"; + + private static bool TryParseMonth(string? s, out DateTime month) + { + month = default; + if (string.IsNullOrWhiteSpace(s)) return false; + return DateTime.TryParseExact(s, "yyyy-MM", CultureInfo.InvariantCulture, DateTimeStyles.None, out month); + } + + public static IObservable Detail(LayoutAreaHost host, RenderingContext _) + { + var hubPath = host.Hub.Address.ToString(); + var meshService = host.Hub.ServiceProvider.GetRequiredService(); + + var nodeStream = host.Workspace.GetStream()! + .Select(nodes => nodes?.FirstOrDefault(n => n.Path == hubPath)); + var profiles = meshService + .ObserveQuery(MeshQueryRequest.FromQuery("namespace:SocialMedia/Profile")) + .Scan(new Dictionary(StringComparer.OrdinalIgnoreCase), ApplyChanges); + + return nodeStream.CombineLatest(profiles, (node, profileDict) => + { + if (node is null) return (UiControl?)Controls.Markdown("*Post not found.*"); + + var title = node.Name ?? GetProp(node, "title") ?? "(untitled)"; + var body = GetProp(node, "body"); + var profilePath = GetProp(node, "profilePath") ?? ""; + var profile = profileDict.GetValueOrDefault(profilePath); + var profileName = profile?.Name ?? profilePath.Split('/').Last(); + var platformId = GetProp(node, "platform") ?? (profile is not null ? GetProp(profile, "platform") : null) ?? "LinkedIn"; + var platform = Platform.GetById(platformId); + var scheduled = GetDate(node, "scheduledAt"); + var published = GetDate(node, "publishedAt"); + var impressions = GetInt(node, "impressions"); + var likes = GetInt(node, "likes"); + var comments = GetInt(node, "comments"); + var media = GetProp(node, "mediaUrl"); + var status = published.HasValue ? "Published" : (scheduled.HasValue && scheduled.Value > DateTimeOffset.Now ? "Scheduled" : "Draft"); + var statusColor = published.HasValue ? "#2e7d32" : "#ed6c02"; + + var headerHtml = $$""" +
+ {{platform.Emoji}} {{HttpUtility.HtmlEncode(platform.Name)}} + @{{HttpUtility.HtmlEncode(profileName)}} + {{status}} +
+ """; + + var datesHtml = $$""" + + + +
Scheduled{{HttpUtility.HtmlEncode(scheduled?.ToString("yyyy-MM-dd HH:mm") ?? "—")}}
Published{{HttpUtility.HtmlEncode(published?.ToString("yyyy-MM-dd HH:mm") ?? "—")}}
+ """; + + var statsHtml = $$""" +
+
Impressions
{{impressions:N0}}
+
Likes
{{likes:N0}}
+
Comments
{{comments:N0}}
+
+ """; + + var mediaHtml = ""; + if (!string.IsNullOrEmpty(media)) + { + var lower = media.ToLowerInvariant(); + if (lower.EndsWith(".mp4") || lower.EndsWith(".webm") || lower.EndsWith(".mov")) + mediaHtml = $""; + else + mediaHtml = $"\"media\""; + } + + var headerStack = Controls.Stack.WithStyle("padding: 16px; gap: 4px;") + .WithView(Controls.Html($"

{HttpUtility.HtmlEncode(title)}

")) + .WithView(Controls.Html(headerHtml)) + .WithView(Controls.Html(datesHtml)) + .WithView(Controls.Html(statsHtml)); + if (!string.IsNullOrEmpty(mediaHtml)) + headerStack = headerStack.WithView(Controls.Html(mediaHtml)); + if (!string.IsNullOrWhiteSpace(body)) + headerStack = headerStack.WithView(Controls.Markdown(body)); + headerStack = headerStack.WithView(Controls.Html($"\u2190 Back to calendar")); + return (UiControl?)headerStack; + }); + } +} diff --git a/samples/Graph/Data/SocialMedia/Posts.json b/samples/Graph/Data/SocialMedia/Posts.json new file mode 100644 index 000000000..6249839ca --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Posts.json @@ -0,0 +1,17 @@ +{ + "id": "Posts", + "namespace": "SocialMedia", + "path": "SocialMedia/Posts", + "name": "Posts", + "nodeType": "Markdown", + "category": "SocialMedia", + "description": "Calendar overview of scheduled and published social media posts", + "icon": "/static/NodeTypeIcons/document.svg", + "isPersistent": true, + "content": { + "$type": "MarkdownDocument", + "title": "Social Media Calendar", + "icon": "/static/NodeTypeIcons/document.svg", + "content": "# Social Media Calendar\n\nPlan and review your scheduled LinkedIn posts. Use the arrows to switch months and the filter buttons to narrow down to a single profile. By default the calendar shows posts from your own profiles.\n\n@SocialMedia/Post/Calendar\n" + } +} diff --git a/samples/Graph/Data/SocialMedia/Profile.json b/samples/Graph/Data/SocialMedia/Profile.json new file mode 100644 index 000000000..3d36c72a9 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Profile.json @@ -0,0 +1,19 @@ +{ + "id": "Profile", + "namespace": "SocialMedia", + "name": "Social Media Profile", + "nodeType": "NodeType", + "category": "Types", + "description": "A social media profile owned by a user (LinkedIn, X, Instagram, ...)", + "icon": "/static/NodeTypeIcons/person.svg", + "isPersistent": true, + "content": { + "$type": "NodeTypeDefinition", + "id": "Profile", + "namespace": "SocialMedia", + "displayName": "Social Media Profile", + "iconName": "Person", + "description": "A social media profile owned by a user", + "configuration": "config => config.WithContentType().AddData(data => data.AddSource(source => source.WithType(t => t.WithInitialData(Platform.All)))).AddDefaultLayoutAreas().AddLayout(layout => layout.AddSocialMediaProfileLayoutAreas().WithDefaultArea(\"Detail\"))" + } +} diff --git a/samples/Graph/Data/SocialMedia/Profile/Roland-LinkedIn.json b/samples/Graph/Data/SocialMedia/Profile/Roland-LinkedIn.json new file mode 100644 index 000000000..e19d33ba4 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Profile/Roland-LinkedIn.json @@ -0,0 +1,17 @@ +{ + "id": "Roland-LinkedIn", + "namespace": "SocialMedia/Profile", + "path": "SocialMedia/Profile/Roland-LinkedIn", + "name": "Roland on LinkedIn", + "nodeType": "SocialMedia/Profile", + "icon": "/static/NodeTypeIcons/person.svg", + "isPersistent": true, + "content": { + "$type": "SocialMediaProfile", + "name": "Roland on LinkedIn", + "platform": "LinkedIn", + "owner": "rbuergi@systemorph.com", + "profileUrl": "https://www.linkedin.com/in/rolandbuergi/", + "bio": "Building MeshWeaver \u2014 collaborative actor-based runtime for data, AI and reactive UIs." + } +} diff --git a/samples/Graph/Data/SocialMedia/Profile/Sarah-LinkedIn.json b/samples/Graph/Data/SocialMedia/Profile/Sarah-LinkedIn.json new file mode 100644 index 000000000..29c2dbb2f --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Profile/Sarah-LinkedIn.json @@ -0,0 +1,17 @@ +{ + "id": "Sarah-LinkedIn", + "namespace": "SocialMedia/Profile", + "path": "SocialMedia/Profile/Sarah-LinkedIn", + "name": "Sarah on LinkedIn", + "nodeType": "SocialMedia/Profile", + "icon": "/static/NodeTypeIcons/person.svg", + "isPersistent": true, + "content": { + "$type": "SocialMediaProfile", + "name": "Sarah on LinkedIn", + "platform": "LinkedIn", + "owner": "sarah@example.com", + "profileUrl": "https://www.linkedin.com/in/sarah-example/", + "bio": "Marketing lead. Posts about product launches and customer stories." + } +} diff --git a/samples/Graph/Data/SocialMedia/Profile/_Source/Platform.cs b/samples/Graph/Data/SocialMedia/Profile/_Source/Platform.cs new file mode 100644 index 000000000..ae7655974 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Profile/_Source/Platform.cs @@ -0,0 +1,39 @@ +// +// Id: Platform +// DisplayName: Social Media Platform +// + +public record Platform +{ + [Key] + public string Id { get; init; } = string.Empty; + + [Required] + public string Name { get; init; } = string.Empty; + + public string Emoji { get; init; } = string.Empty; + + public string Color { get; init; } = "#0a66c2"; + + public int Order { get; init; } + + public static readonly Platform LinkedIn = new() + { + Id = "LinkedIn", Name = "LinkedIn", Emoji = "\ud83d\udcbc", Color = "#0a66c2", Order = 0 + }; + + public static readonly Platform Twitter = new() + { + Id = "Twitter", Name = "X / Twitter", Emoji = "\ud83d\udc26", Color = "#000000", Order = 1 + }; + + public static readonly Platform Instagram = new() + { + Id = "Instagram", Name = "Instagram", Emoji = "\ud83d\udcf7", Color = "#e1306c", Order = 2 + }; + + public static readonly Platform[] All = [LinkedIn, Twitter, Instagram]; + + public static Platform GetById(string? id) => + All.FirstOrDefault(p => p.Id == id) ?? LinkedIn; +} diff --git a/samples/Graph/Data/SocialMedia/Profile/_Source/SocialMediaProfile.cs b/samples/Graph/Data/SocialMedia/Profile/_Source/SocialMediaProfile.cs new file mode 100644 index 000000000..0dd054018 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Profile/_Source/SocialMediaProfile.cs @@ -0,0 +1,31 @@ +// +// Id: SocialMediaProfile +// DisplayName: Social Media Profile +// + +using MeshWeaver.Domain; + +public record SocialMediaProfile +{ + [Required] + [MeshNodeProperty(nameof(MeshNode.Name))] + public string Name { get; init; } = string.Empty; + + [Required] + [Dimension] + [UiControl(Style = "width: 200px;")] + public string Platform { get; init; } = "LinkedIn"; + + [Required] + [DisplayName("Owner email")] + public string Owner { get; init; } = string.Empty; + + [DisplayName("Profile URL")] + public string? ProfileUrl { get; init; } + + [DisplayName("Avatar URL")] + public string? AvatarUrl { get; init; } + + [Markdown(EditorHeight = "120px")] + public string? Bio { get; init; } +} diff --git a/samples/Graph/Data/SocialMedia/Profile/_Source/SocialMediaProfileLayoutAreas.cs b/samples/Graph/Data/SocialMedia/Profile/_Source/SocialMediaProfileLayoutAreas.cs new file mode 100644 index 000000000..d44e33ef9 --- /dev/null +++ b/samples/Graph/Data/SocialMedia/Profile/_Source/SocialMediaProfileLayoutAreas.cs @@ -0,0 +1,70 @@ +// +// Id: SocialMediaProfileLayoutAreas +// DisplayName: Social Media Profile Views +// + +using System.Text.Json; +using System.Web; +using MeshWeaver.Layout.Composition; +using MeshWeaver.Mesh.Services; + +public static class SocialMediaProfileLayoutAreas +{ + public static LayoutDefinition AddSocialMediaProfileLayoutAreas(this LayoutDefinition layout) => + layout.WithView("Detail", Detail); + + private static string? GetProp(MeshNode node, string prop) + { + if (node.Content is not JsonElement json) return null; + if (json.TryGetProperty(prop, out var p) && p.ValueKind == JsonValueKind.String) return p.GetString(); + var pascal = char.ToUpperInvariant(prop[0]) + prop.Substring(1); + return json.TryGetProperty(pascal, out var pp) && pp.ValueKind == JsonValueKind.String ? pp.GetString() : null; + } + + public static IObservable Detail(LayoutAreaHost host, RenderingContext _) + { + var hubPath = host.Hub.Address.ToString(); + var meshService = host.Hub.ServiceProvider.GetRequiredService(); + + return host.Workspace.GetStream()! + .Select(nodes => + { + var node = nodes?.FirstOrDefault(n => n.Path == hubPath); + if (node is null) return (UiControl?)Controls.Markdown("*Profile not found.*"); + + var name = node.Name ?? GetProp(node, "name") ?? "Profile"; + var platformId = GetProp(node, "platform") ?? "LinkedIn"; + var platform = Platform.GetById(platformId); + var owner = GetProp(node, "owner") ?? ""; + var profileUrl = GetProp(node, "profileUrl"); + var avatarUrl = GetProp(node, "avatarUrl"); + var bio = GetProp(node, "bio"); + + var avatar = !string.IsNullOrEmpty(avatarUrl) + ? $"\"avatar\"" + : $"
{platform.Emoji}
"; + + var link = !string.IsNullOrEmpty(profileUrl) + ? $"Open profile \u2197" + : "No profile URL"; + + var html = $$""" +
+ {{avatar}} +
+

{{HttpUtility.HtmlEncode(name)}}

+
{{platform.Emoji}} {{HttpUtility.HtmlEncode(platform.Name)}}
+
Owner: {{HttpUtility.HtmlEncode(owner)}}
+
{{link}}
+
+
+ """; + + var stack = Controls.Stack.WithStyle("padding: 16px;") + .WithView(Controls.Html(html)); + if (!string.IsNullOrEmpty(bio)) + stack = stack.WithView(Controls.Markdown(bio)); + return (UiControl?)stack; + }); + } +} diff --git a/samples/Graph/Data/User/Alice.json b/samples/Graph/Data/User/Alice.json index 941459f5b..a2b8d55cc 100644 --- a/samples/Graph/Data/User/Alice.json +++ b/samples/Graph/Data/User/Alice.json @@ -8,6 +8,7 @@ "content": { "$type": "User", "email": "alice.chen@meshweaver.io", - "bio": "Software engineer and project contributor." + "bio": "Software engineer and project contributor.", + "pinnedPaths": ["Doc"] } } diff --git a/samples/Graph/Data/User/Bob.json b/samples/Graph/Data/User/Bob.json index 30b66de78..a53f803ed 100644 --- a/samples/Graph/Data/User/Bob.json +++ b/samples/Graph/Data/User/Bob.json @@ -8,6 +8,7 @@ "content": { "$type": "User", "email": "bob.smith@meshweaver.io", - "bio": "Software engineer and project contributor." + "bio": "Software engineer and project contributor.", + "pinnedPaths": ["Doc"] } } diff --git a/samples/Graph/Data/User/Carol.json b/samples/Graph/Data/User/Carol.json index 273be92dc..e9029f9c1 100644 --- a/samples/Graph/Data/User/Carol.json +++ b/samples/Graph/Data/User/Carol.json @@ -8,6 +8,7 @@ "content": { "$type": "User", "email": "carol.johnson@meshweaver.io", - "bio": "Software engineer and project contributor." + "bio": "Software engineer and project contributor.", + "pinnedPaths": ["Doc"] } } diff --git a/samples/Graph/Data/User/David.json b/samples/Graph/Data/User/David.json index 25bcf73bc..65812c1e0 100644 --- a/samples/Graph/Data/User/David.json +++ b/samples/Graph/Data/User/David.json @@ -8,6 +8,7 @@ "content": { "$type": "User", "email": "david.lee@meshweaver.io", - "bio": "Software engineer and project contributor." + "bio": "Software engineer and project contributor.", + "pinnedPaths": ["Doc"] } } diff --git a/samples/Graph/Data/User/Emma.json b/samples/Graph/Data/User/Emma.json index cbcc5ba06..6a542fb39 100644 --- a/samples/Graph/Data/User/Emma.json +++ b/samples/Graph/Data/User/Emma.json @@ -8,6 +8,7 @@ "content": { "$type": "User", "email": "emma.wilson@meshweaver.io", - "bio": "Software engineer and project contributor." + "bio": "Software engineer and project contributor.", + "pinnedPaths": ["Doc"] } } diff --git a/samples/Graph/Data/User/Roland.json b/samples/Graph/Data/User/Roland.json index 691c0361f..f07375258 100644 --- a/samples/Graph/Data/User/Roland.json +++ b/samples/Graph/Data/User/Roland.json @@ -8,6 +8,7 @@ "content": { "$type": "User", "email": "rbuergi@systemorph.com", - "bio": "Founder of Systemorph and creator of MeshWeaver." + "bio": "Founder of Systemorph and creator of MeshWeaver.", + "pinnedPaths": ["Doc"] } } diff --git a/samples/Graph/Data/User/Samuel.json b/samples/Graph/Data/User/Samuel.json index d15419703..f626bb32b 100644 --- a/samples/Graph/Data/User/Samuel.json +++ b/samples/Graph/Data/User/Samuel.json @@ -8,6 +8,7 @@ "content": { "$type": "User", "email": "sglauser@systemorph.com", - "bio": "Software engineer and MeshWeaver contributor." + "bio": "Software engineer and MeshWeaver contributor.", + "pinnedPaths": ["Doc"] } } diff --git a/src/MeshWeaver.AI.AzureFoundry/AzureClaudeChatClient.cs b/src/MeshWeaver.AI.AzureFoundry/AzureClaudeChatClient.cs index b43300f13..a96fc6f7c 100644 --- a/src/MeshWeaver.AI.AzureFoundry/AzureClaudeChatClient.cs +++ b/src/MeshWeaver.AI.AzureFoundry/AzureClaudeChatClient.cs @@ -134,6 +134,11 @@ public async IAsyncEnumerable GetStreamingResponseAsync( string? currentToolName = null; var currentToolInput = new StringBuilder(); + // Cumulative token counters across the stream (Anthropic emits input-tokens + // once on message_start and cumulative output-tokens on message_delta). + var inputTokens = 0; + var outputTokens = 0; + while ((line = await reader.ReadLineAsync(cancellationToken)) != null) { if (string.IsNullOrEmpty(line) || !line.StartsWith("data: ")) @@ -160,6 +165,13 @@ public async IAsyncEnumerable GetStreamingResponseAsync( { case "message_start": currentRole = streamEvent.Message?.Role ?? "assistant"; + if (streamEvent.Message?.Usage is { } startUsage) + { + inputTokens = startUsage.InputTokens; + // Anthropic reports cumulative output tokens on message_delta; + // seed with any initial value on message_start. + outputTokens = startUsage.OutputTokens; + } break; case "content_block_start": @@ -216,6 +228,12 @@ public async IAsyncEnumerable GetStreamingResponseAsync( break; case "message_delta": + if (streamEvent.Usage is { } deltaUsage) + { + // Anthropic: output_tokens on message_delta is the running cumulative + // count. Keep the latest so the final UsageContent below has the total. + outputTokens = deltaUsage.OutputTokens; + } if (streamEvent.Delta?.StopReason != null) { yield return new ChatResponseUpdate(ChatRole.Assistant, string.Empty) @@ -224,6 +242,22 @@ public async IAsyncEnumerable GetStreamingResponseAsync( }; } break; + + case "message_stop": + // Final UsageContent carries the totals — ThreadExecution stamps the + // response cell's InputTokens/OutputTokens/TotalTokens from this. + if (inputTokens > 0 || outputTokens > 0) + { + yield return new ChatResponseUpdate(ChatRole.Assistant, [ + new UsageContent(new UsageDetails + { + InputTokenCount = inputTokens, + OutputTokenCount = outputTokens, + TotalTokenCount = inputTokens + outputTokens + }) + ]); + } + break; } } } @@ -578,6 +612,8 @@ private class ClaudeStreamEvent public ClaudeStreamDelta? Delta { get; set; } public ClaudeStreamContentBlock? ContentBlock { get; set; } public int? Index { get; set; } + /// Populated on the message_delta event — cumulative output-token count. + public ClaudeUsage? Usage { get; set; } } private class ClaudeStreamMessage @@ -586,6 +622,8 @@ private class ClaudeStreamMessage public string? Type { get; set; } public string? Role { get; set; } public string? Model { get; set; } + /// Populated on the message_start event — input-token count for the turn. + public ClaudeUsage? Usage { get; set; } } private class ClaudeStreamContentBlock diff --git a/src/MeshWeaver.AI/AgentChatClient.cs b/src/MeshWeaver.AI/AgentChatClient.cs index 7fd7753c3..fbcb05f92 100644 --- a/src/MeshWeaver.AI/AgentChatClient.cs +++ b/src/MeshWeaver.AI/AgentChatClient.cs @@ -652,6 +652,15 @@ public async IAsyncEnumerable GetStreamingResponseAsync( AuthorName = currentAgentName ?? "Assistant" }; } + else if (content is UsageContent) + { + // Forward token-usage content so ThreadExecution can record + // InputTokens / OutputTokens / TotalTokens on the response cell. + yield return new ChatResponseUpdate(ChatRole.Assistant, [content]) + { + AuthorName = currentAgentName ?? "Assistant" + }; + } } } @@ -722,6 +731,13 @@ public async IAsyncEnumerable GetStreamingResponseAsync( AuthorName = currentAgentName ?? "Assistant" }; } + else if (content is UsageContent) + { + yield return new ChatResponseUpdate(ChatRole.Assistant, [content]) + { + AuthorName = currentAgentName ?? "Assistant" + }; + } } } diff --git a/src/MeshWeaver.AI/AgentView.cs b/src/MeshWeaver.AI/AgentView.cs index 77222032e..323b3d83e 100644 --- a/src/MeshWeaver.AI/AgentView.cs +++ b/src/MeshWeaver.AI/AgentView.cs @@ -447,7 +447,28 @@ private static UiControl BuildEditLayout(LayoutAreaHost host, AgentConfiguration new DataChangeRequest { ChangedBy = actx.Host.Stream.ClientId }.WithUpdates(updatedAgent), o => o.WithTarget(hubAddress))!; var callbackResponse = await actx.Host.Hub.RegisterCallback(delivery, (d, _) => Task.FromResult(d), cts.Token); - var responseMsg = ((IMessageDelivery)callbackResponse).Message; + + // Handle routing failures (e.g., agent hub unreachable) and unexpected + // response shapes before touching the DataChangeResponse fields. + if (callbackResponse is IMessageDelivery deliveryFailure) + { + var dialog = Controls.Dialog( + Controls.Markdown($"**Error saving:**\n\n{deliveryFailure.Message.Message ?? "Delivery failed"}"), + "Save Failed" + ).WithSize("M"); + actx.Host.UpdateArea(DialogControl.DialogArea, dialog); + return; + } + if (callbackResponse is not IMessageDelivery dataChange) + { + var dialog = Controls.Dialog( + Controls.Markdown($"**Error saving:** Unexpected response `{callbackResponse.Message?.GetType().Name ?? "null"}`."), + "Save Failed" + ).WithSize("M"); + actx.Host.UpdateArea(DialogControl.DialogArea, dialog); + return; + } + var responseMsg = dataChange.Message; if (responseMsg.Log.Status != ActivityStatus.Succeeded) { diff --git a/src/MeshWeaver.AI/ChatClientAgentFactory.cs b/src/MeshWeaver.AI/ChatClientAgentFactory.cs index 5a92a08dc..bae7758c7 100644 --- a/src/MeshWeaver.AI/ChatClientAgentFactory.cs +++ b/src/MeshWeaver.AI/ChatClientAgentFactory.cs @@ -256,185 +256,8 @@ protected virtual IEnumerable GetAgentTools( var delegationTool = DelegationTool.CreateUnifiedDelegationTool( agentConfig, hierarchyAgents, - executeAsync: (agentName, task, context, cancellationToken) => - { - // Resolve the target agent by name (strip path prefix if present) - var targetId = agentName.Split('/').Last(); - if (!allAgents.TryGetValue(targetId, out var targetAgent)) - { - return Task.FromResult(new DelegationResult - { - AgentName = agentName, - Task = task, - Result = $"Agent '{agentName}' not found", - Success = false - }); - } - - var execCtx = chat.ExecutionContext; - var userIdentity = execCtx?.UserAccessContext?.ObjectId ?? "(no-user)"; - - // Guard: limit delegation depth to prevent recursive delegation. - // Count segments after _Thread/ — each delegation adds msgId/subThreadId (2 segments per level). - // Root thread: Org/_Thread/threadId → 0 extra segments → depth 0 - // 1st delegation: Org/_Thread/threadId/msgId/subId → 2 extra → depth 1 - // 2nd delegation: Org/_Thread/threadId/msgId/subId/msgId2/subId2 → 4 extra → depth 2 - var threadPath = execCtx?.ThreadPath ?? ""; - var threadIdx = threadPath.IndexOf("/_Thread/"); - var depth = 0; - if (threadIdx >= 0) - { - var afterThread = threadPath[(threadIdx + "/_Thread/".Length)..]; - var segments = afterThread.Split('/').Length; - depth = (segments - 1) / 2; // subtract the thread id, each delegation = 2 segments - } - if (depth >= 2) - { - Logger.LogWarning("[Delegation] Max delegation depth ({Depth}) reached at {ThreadPath} for {Source} → {Target}", - depth, threadPath, agentConfig.Id, targetId); - return Task.FromResult(new DelegationResult - { - AgentName = targetId, Task = task, - Result = $"Maximum delegation depth reached ({depth}). You must handle this task directly without further delegation.", - Success = false - }); - } - - Logger.LogInformation( - "[Delegation] {Source} → {Target}, user={User}, depth={Depth}, task={Task}", - agentConfig.Id, targetId, userIdentity, depth, task.Length > 100 ? task[..97] + "..." : task); - - if (execCtx == null) - { - return Task.FromResult(new DelegationResult - { - AgentName = targetId, - Task = task, - Result = "No execution context available for delegation", - Success = false - }); - } - - // TCS completed by subscription callbacks — framework awaits this, not our code - var tcs = new TaskCompletionSource(); - var meshService = Hub.ServiceProvider.GetRequiredService(); - var workspace = Hub.GetWorkspace(); - - // Access context is restored by WrapToolWithAccessContext — no need to set it here. - - var parentMsgPath = $"{execCtx.ThreadPath}/{execCtx.ResponseMessageId}"; - // MainNode for sub-thread = parent thread's MainNode (content node). - // ContextPath comes from the thread execution context which is set from - // the thread node's MainNode (e.g., "PartnerRe/AIConsulting"). - var mainEntityPath = execCtx.ContextPath ?? context ?? execCtx.ThreadPath; - - // Build sub-thread with pre-populated messages - var (subThreadNode, userMsgId, responseMsgId) = ThreadNodeType.BuildThreadWithMessages( - parentMsgPath, task, createdBy: execCtx.UserAccessContext?.ObjectId, - agentName: targetId); - subThreadNode = subThreadNode with { MainNode = mainEntityPath }; - var subThreadPath = subThreadNode.Path!; - - // Store delegation path keyed by display name for parallel-safe lookup. - var delegationDisplayName = $"Delegating to {targetId}..."; - chat.DelegationPaths[delegationDisplayName] = subThreadPath; - chat.LastDelegationPath = subThreadPath; - chat.UpdateDelegationStatus?.Invoke(delegationDisplayName); - - // Create cells FIRST, then thread. WatchForExecution starts execution - // when it sees IsExecuting=true on the thread. - Logger.LogInformation("[Delegation] Creating cells for sub-thread {Path}: user={UserMsgId}, response={ResponseMsgId}", subThreadPath, userMsgId, responseMsgId); - meshService.CreateNode(new MeshNode(userMsgId, subThreadPath) - { - NodeType = ThreadMessageNodeType.NodeType, MainNode = mainEntityPath, - Content = new ThreadMessage - { - Role = "user", Text = task, Timestamp = DateTime.UtcNow, - Type = ThreadMessageType.ExecutedInput, CreatedBy = execCtx.UserAccessContext?.ObjectId - } - }).Subscribe(_ => { }, error => Logger.LogError(error, "[Delegation] User cell creation failed for {Path}", subThreadPath)); - - meshService.CreateNode(new MeshNode(responseMsgId, subThreadPath) - { - NodeType = ThreadMessageNodeType.NodeType, MainNode = mainEntityPath, - Content = new ThreadMessage - { - Role = "assistant", Text = "", Timestamp = DateTime.UtcNow, - Type = ThreadMessageType.AgentResponse, AgentName = targetId - } - }).Subscribe(_ => { }, error => Logger.LogError(error, "[Delegation] Response cell creation failed for {Path}", subThreadPath)); - - Logger.LogInformation("[Delegation] Creating sub-thread at {Path}", subThreadPath); - meshService.CreateNode(subThreadNode).Subscribe( - _ => - { - Logger.LogInformation("[Delegation] Sub-thread created at {Path}, watching for completion", subThreadPath); - - // Watch the sub-thread's state via ObserveQuery — when IsExecuting becomes false, delegation is done. - var meshSvc = Hub.ServiceProvider.GetService(); - bool IsComplete(QueryResultChange c) - { - foreach (var n in c.Items) - if (n.Content is MeshThread { IsExecuting: false, PendingUserMessage: null }) - return true; - return false; - } - var filtered = System.Reactive.Linq.Observable.Where( - meshSvc!.ObserveQuery(MeshQueryRequest.FromQuery($"path:{subThreadPath}")), - IsComplete); - System.Reactive.Linq.Observable.Timeout( - System.Reactive.Linq.Observable.Take(filtered, 1), - TimeSpan.FromSeconds(30)) - .Subscribe( - change => - { - var node = change.Items.FirstOrDefault(); - var thread = node?.Content as MeshThread; - // Read the response from the response message cell - var responseText = $"Delegation to {targetId} completed."; - Logger.LogInformation("[Delegation] Completed: {Path}", subThreadPath); - tcs.TrySetResult(new DelegationResult - { - AgentName = targetId, Task = task, - Result = responseText, - Success = true, - ThreadId = subThreadPath - }); - }, - ex => - { - Logger.LogWarning(ex, "[Delegation] Watch failed for {Path}", subThreadPath); - tcs.TrySetResult(new DelegationResult - { - AgentName = targetId, Task = task, - Result = $"Delegation timed out: {ex.Message}", - Success = false, ThreadId = subThreadPath - }); - }); - }, - error => - { - Logger.LogWarning(error, "[Delegation] Failed to create sub-thread at {Path} for {Target}, Hub={Hub}", subThreadPath, targetId, Hub.Address); - tcs.TrySetResult(new DelegationResult - { - AgentName = targetId, Task = task, - Result = $"Node creation failed: {error.Message}", Success = false - }); - }); - - // Register cancellation to prevent infinite hang if sub-thread routing fails - cancellationToken.Register(() => - { - tcs.TrySetResult(new DelegationResult - { - AgentName = targetId, Task = task, - Result = $"Delegation to {targetId} was cancelled.", - Success = false - }); - }); - - return tcs.Task; - }, + executeAsync: (agentName, task, context, ct) => + ExecuteDelegationAsync(agentConfig, allAgents, chat, agentName, task, context, ct), Logger); Logger.LogInformation("Created unified delegation tool for agent {AgentName} with {HierarchyCount} hierarchy agents", @@ -459,6 +282,168 @@ bool IsComplete(QueryResultChange c) } } + /// + /// Dispatches a sub-thread and yields its streaming text deltas as . + /// + /// The sub-thread is created fire-and-forget via IMeshService.CreateNode (no await on + /// completion). Its response-message cell is observed through a workspace remote stream; each + /// incremental delta is yielded up to the , and via + /// that — through the parent agent's streaming response — into the parent's response bubble. + /// + /// No , no , no + /// ObserveQuery. The only awaits here are on the channel reader which drains on + /// cancellation or on the sub-thread's CompletedAt flip — neither touches the hub scheduler + /// (both run on the Task.Run thread pool). + /// + private async IAsyncEnumerable ExecuteDelegationAsync( + AgentConfiguration agentConfig, + IReadOnlyDictionary allAgents, + IAgentChat chat, + string agentName, + string task, + string? context, + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + // Resolve target agent (strip path prefix if present). + var targetId = agentName.Split('/').Last(); + if (!allAgents.TryGetValue(targetId, out _)) + { + yield return $"Agent '{agentName}' not found"; + yield break; + } + + var execCtx = chat.ExecutionContext; + if (execCtx == null) + { + yield return "No execution context available for delegation"; + yield break; + } + + // Guard: limit delegation depth. See comment on original version for segment math. + var threadPath = execCtx.ThreadPath; + var threadIdx = threadPath.IndexOf("/_Thread/", StringComparison.Ordinal); + var depth = 0; + if (threadIdx >= 0) + { + var afterThread = threadPath[(threadIdx + "/_Thread/".Length)..]; + var segments = afterThread.Split('/').Length; + depth = (segments - 1) / 2; + } + if (depth >= 2) + { + Logger.LogWarning("[Delegation] Max depth reached at {ThreadPath}: {Source} → {Target}", + threadPath, agentConfig.Id, targetId); + yield return $"Maximum delegation depth reached ({depth}). Handle this task directly."; + yield break; + } + + Logger.LogInformation("[Delegation] {Source} → {Target}, depth={Depth}, task={Task}", + agentConfig.Id, targetId, depth, task.Length > 100 ? task[..97] + "..." : task); + + var meshService = Hub.ServiceProvider.GetRequiredService(); + var parentMsgPath = $"{threadPath}/{execCtx.ResponseMessageId}"; + var mainEntityPath = execCtx.ContextPath ?? context ?? threadPath; + + // Build the sub-thread with IsExecuting=true + PendingUserMessage so its hub's + // WatchForExecution starts streaming on activation. + var (subThreadNode, userMsgId, responseMsgId) = ThreadNodeType.BuildThreadWithMessages( + parentMsgPath, task, + createdBy: execCtx.UserAccessContext?.ObjectId, + agentName: targetId); + subThreadNode = subThreadNode with { MainNode = mainEntityPath }; + var subThreadPath = subThreadNode.Path!; + var responsePath = $"{subThreadPath}/{responseMsgId}"; + + // Stamp the delegation path so the parent's bubble can render the inline link. + var delegationDisplayName = $"Delegating to {targetId}..."; + chat.DelegationPaths[delegationDisplayName] = subThreadPath; + chat.LastDelegationPath = subThreadPath; + chat.UpdateDelegationStatus?.Invoke(delegationDisplayName); + + Logger.LogInformation("[Delegation] Dispatch sub-thread {Path}: user={UserMsgId}, response={ResponseMsgId}", + subThreadPath, userMsgId, responseMsgId); + + // Create satellite cells + thread node reactively (no await). + meshService.CreateNode(new MeshNode(userMsgId, subThreadPath) + { + NodeType = ThreadMessageNodeType.NodeType, + MainNode = mainEntityPath, + Content = new ThreadMessage + { + Role = "user", Text = task, Timestamp = DateTime.UtcNow, + Type = ThreadMessageType.ExecutedInput, + CreatedBy = execCtx.UserAccessContext?.ObjectId + } + }).Subscribe(_ => { }, + error => Logger.LogDebug(error, "[Delegation] User cell create for {Path} returned error", subThreadPath)); + + meshService.CreateNode(new MeshNode(responseMsgId, subThreadPath) + { + NodeType = ThreadMessageNodeType.NodeType, + MainNode = mainEntityPath, + Content = new ThreadMessage + { + Role = "assistant", Text = "", Timestamp = DateTime.UtcNow, + Type = ThreadMessageType.AgentResponse, AgentName = targetId + } + }).Subscribe(_ => { }, + error => Logger.LogDebug(error, "[Delegation] Response cell create for {Path} returned error", subThreadPath)); + + meshService.CreateNode(subThreadNode).Subscribe( + _ => Logger.LogInformation("[Delegation] Sub-thread created at {Path}", subThreadPath), + error => Logger.LogWarning(error, "[Delegation] Sub-thread create failed at {Path}", subThreadPath)); + + yield return $"\n\n**Delegating to {targetId}…**\n\n"; + + // Open a channel fed by the sub-thread's response-cell remote stream. We yield + // each text delta as it arrives (computed against lastText so we never double-emit). + var channel = System.Threading.Channels.Channel.CreateUnbounded( + new System.Threading.Channels.UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = false + }); + + var workspace = Hub.GetWorkspace(); + var lastText = ""; + var subscription = workspace.GetRemoteStream( + new Address(responsePath), new MeshNodeReference()) + ?.Subscribe( + change => + { + var msg = change.Value?.Content as ThreadMessage; + if (msg == null) return; + var current = msg.Text ?? ""; + if (current.Length > lastText.Length) + { + var delta = current[lastText.Length..]; + lastText = current; + channel.Writer.TryWrite(delta); + } + if (msg.CompletedAt is not null) + channel.Writer.TryComplete(); + }, + ex => channel.Writer.TryComplete(ex), + () => channel.Writer.TryComplete()); + + // Safety timeout so a never-completing sub-thread can't pin this iterator forever. + using var timeout = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + using var linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeout.Token); + + try + { + await foreach (var delta in channel.Reader.ReadAllAsync(linked.Token)) + { + yield return delta; + } + } + finally + { + subscription?.Dispose(); + Logger.LogInformation("[Delegation] Stream closed for sub-thread {Path}", subThreadPath); + } + } + /// /// Resolves a plugin reference to AITool instances. /// Built-in plugin "Mesh" is resolved directly; custom plugins are resolved from DI. diff --git a/src/MeshWeaver.AI/Completion/AutocompleteClient.cs b/src/MeshWeaver.AI/Completion/AutocompleteClient.cs index c9220e55c..070d76f8d 100644 --- a/src/MeshWeaver.AI/Completion/AutocompleteClient.cs +++ b/src/MeshWeaver.AI/Completion/AutocompleteClient.cs @@ -41,11 +41,13 @@ public async Task GetCompletionsAsync( new AutocompleteRequest(query, context?.Context), o => o.WithTarget(address))!; var callbackResponse = await hub.RegisterCallback(delivery, (d, _) => Task.FromResult(d), timeoutCts.Token); - var responseMsg = ((IMessageDelivery)callbackResponse).Message; - if (responseMsg?.Items != null) + // Tolerate hub-level failures (target unreachable, timeout as DeliveryFailure) + // and any unexpected response type — skipping is the historical behaviour. + if (callbackResponse is IMessageDelivery ok + && ok.Message?.Items != null) { - allItems = allItems.AddRange(responseMsg.Items); + allItems = allItems.AddRange(ok.Message.Items); } } catch diff --git a/src/MeshWeaver.AI/Data/Agent/Coder.md b/src/MeshWeaver.AI/Data/Agent/Coder.md index cba425421..647ad458e 100644 --- a/src/MeshWeaver.AI/Data/Agent/Coder.md +++ b/src/MeshWeaver.AI/Data/Agent/Coder.md @@ -16,6 +16,24 @@ delegations: You are **Coder**, the node type engineering agent. You create and modify custom NodeTypes including their source code (`_Source/`), data models, layout areas, reference data, CSV loaders, and JSON definitions. +# Decision Rule: NodeType vs Markdown + +When the user describes a **data model, object type, custom entity, or interactive view** — e.g. "social media posts with a calendar", "a task tracker", "risk model with charts", "build X as code" — you build a **NodeType**: a `NodeType` JSON + `_Source/` C# files + at least one instance JSON. + +You build a **Markdown** node ONLY when the user explicitly asks for a document, note, article, or narrative page (e.g. "write a doc about X", "draft a changelog", "add an FAQ page"). + +**Never** use a Markdown node as a shortcut for something that should be typed data. If in doubt, build a NodeType — a user who wanted Markdown will say so. + +## Canonical Example + +The walkthrough at [SocialMedia model node type](@@Doc/DataMesh/SocialMedia) is the reference implementation. It has exactly the shape you should produce: + +- `Post.json`, `Profile.json` — NodeType definitions with a `configuration` lambda +- `Post/_Source/*.cs`, `Profile/_Source/*.cs` — content record, reference data (`Platform`), layout areas +- `Post/Post-001.json`, `Profile/Roland-LinkedIn.json` — instances alongside (IDs are meaningful — never `SamplePost`/`SampleProfile`) + +When asked to build "X as code" or "X as a model", open that example, mirror its shape, then adapt to the user's domain. + # How Node Types Work A NodeType is a MeshNode with `nodeType: "NodeType"` whose `content` contains a `NodeTypeDefinition` with a `configuration` field. The configuration is a C# lambda expression compiled at startup. @@ -234,7 +252,11 @@ When asked to create a node type: - CSV loaders if loading external data 5. **Create the NodeType JSON** with the configuration lambda 6. **Upload CSV files** to the content collection if needed -7. **Verify** by getting the created nodes +7. **Verify compilation** — this step is NOT optional: + - Call `GetDiagnostics('@{nodeTypePath}')` after every NodeType create/update. + - If `status: "Error"` → read `error`, fix the broken source or the NodeType JSON (often the fix is adding a `sources` entry pointing at another NodeType's `_Source` via `$self` or an absolute path), write the fix with `Update`/`Patch`, and re-check. + - Repeat until `status: "Ok"`. Only then is the NodeType "done". + - Alternative: a plain `Get('@{path}')` on any instance (or the NodeType itself) wraps the JSON with a `compilationError` field when the type failed to compile — useful when you want the node data and the compile status together. # Business Rules & Calculations @@ -244,7 +266,7 @@ For domain-specific logic (financial models, reinsurance cession, risk analysis, 2. **Business Rules** — pure C# calculation engines with no framework dependencies 3. **Layout Areas** — reactive charts with `Chart.Create(DataSet.Bar(...))`, filter toolbars via `host.Toolbar(model, id)`, and `host.GetDataStream(id).Select(...)` for reactive updates -See the full walkthrough with a reinsurance cession example: [Business Rules & Calculations](@@Doc/Architecture/BusinessRules) +See [SocialMedia](@@Doc/DataMesh/SocialMedia) for a plain-CRUD reference example, and [Business Rules & Calculations](@@Doc/Architecture/BusinessRules) for a chart/calculation-heavy reinsurance-cession example. For a production implementation, see: - [CededCashflows.cs](https://github.com/Systemorph/MeshWeaver.Reinsurance/blob/main/src/MeshWeaver.Reinsurance/Cession/CededCashflows.cs) — cession calculation engine @@ -311,18 +333,25 @@ When asked to create an interactive document, create a Markdown node with the ex **NEVER just describe what you would create. ALWAYS call Create, Update, or Patch to write the actual content.** If you didn't call a write tool, nothing was produced. The user expects to see a real node with real content after your work — not a description of what could be created. -- Asked to create a Markdown document? → Call `Create` with the full markdown content. -- Asked to create a NodeType? → Call `Create` for each source file and the JSON definition. +- Asked for a data model, type, or view? → Create a **NodeType**: JSON + `_Source/` `.cs` files + at least one sample instance. **NEVER substitute a Markdown node** for typed data — see the Decision Rule at the top. +- Asked for a document, article, or narrative page? → Create a Markdown node with the full content. +- Asked to create a NodeType? → Call `Create` for each source file and the JSON definition, **then call `GetDiagnostics` and don't stop until `status: "Ok"`**. - Asked to modify a node? → Call `Get` first, then `Update` with the modified content. **Every delegation MUST end with at least one write tool call.** +**A NodeType is not "created" until `GetDiagnostics` says `Ok`.** Stopping after +`Create` when compilation is failing leaves the user with a broken type and no +way to use it. Iterate on the source files / `Sources` list until it compiles. + # Tools Use the standard Mesh tools (Get, Search, Create, Update, Delete) to manage nodes. Use ContentCollection tools to upload CSV/data files. When creating `_Source/` files, create them as MeshNodes with: -- `nodeType: "Code"` +- `nodeType: "Code"` (NOT `"Markdown"` — source code files are always Code nodes) - `namespace: "{typePath}/_Source"` -- `content` containing the C# source code +- `content` shaped as `{ "$type": "CodeConfiguration", "code": "…", "language": "csharp" }` containing the C# source + +See [SocialMedia/Post/_Source](@@Doc/DataMesh/SocialMedia) for the concrete file naming and content shape to mirror. diff --git a/src/MeshWeaver.AI/Data/Agent/NodeInitializer.md b/src/MeshWeaver.AI/Data/Agent/NodeInitializer.md new file mode 100644 index 000000000..77f976f6f --- /dev/null +++ b/src/MeshWeaver.AI/Data/Agent/NodeInitializer.md @@ -0,0 +1,59 @@ +--- +nodeType: Agent +name: Node Initializer +description: Generates a Name, PascalCase Id, and inline SVG icon from a short description. Used by the New-Node dialog and the Settings icon editor. +icon: Sparkle +category: Agents +exposedInNavigator: false +modelTier: light +order: 998 +--- + +You are **Node Initializer**. Given a short free-text description of a new knowledge-graph node, produce a concise display Name, a PascalCase Id, and a minimal inline SVG icon that represents the node. + +# Output format — strict + +Respond with EXACTLY these three labelled blocks in this order, nothing else: + +``` +Name: <3-8 word human-readable display name, no quotes, no trailing punctuation> +Id: +Svg: +``` + +# SVG rules + +- Root element: `` +- 24×24 viewBox; strokes only (no filled fills unless essential to meaning); use `currentColor` so the icon inherits theme colors. +- Single line, no line breaks, no XML comments, no `` prolog, no external references (no `xlink:href` to URLs, no ``, no fonts). +- Keep the markup compact — aim for under ~400 characters. Prefer 2-6 primitive shapes (path, circle, rect, line, polyline) that clearly evoke the concept. +- The icon should be recognizable at 16×16 — avoid tiny details. + +# Examples + +Input: "Quarterly sales review presentation for the European team" +``` +Name: European Quarterly Sales Review +Id: EuropeanQuarterlySalesReview +Svg: +``` + +Input: "A checklist of onboarding tasks for new hires" +``` +Name: New Hire Onboarding Checklist +Id: NewHireOnboardingChecklist +Svg: +``` + +Input: "Notes from today's architecture design discussion" +``` +Name: Architecture Design Discussion Notes +Id: ArchitectureDesignDiscussionNotes +Svg: +``` + +# Guidelines + +- If the description is empty or nonsensical, still return the three blocks with generic but valid content (e.g. a document icon, a placeholder name like "Untitled Document", Id "UntitledDocument"). +- Do **not** add extra commentary, markdown fences, code blocks, or explanations around the three labelled lines. The caller parses by label prefix and anything extra breaks the parse. +- The Id must start with an uppercase letter. It must not lowercase the first letter. diff --git a/src/MeshWeaver.AI/MeshOperations.cs b/src/MeshWeaver.AI/MeshOperations.cs index 91df54a00..0663d670b 100644 --- a/src/MeshWeaver.AI/MeshOperations.cs +++ b/src/MeshWeaver.AI/MeshOperations.cs @@ -22,6 +22,7 @@ public class MeshOperations private readonly IMessageHub hub; private readonly ILogger logger; private readonly IMeshService mesh; + private readonly INodeTypeService? nodeTypeService; /// /// Callback invoked when a node is created, updated, or patched. @@ -34,16 +35,49 @@ public MeshOperations(IMessageHub hub) this.hub = hub; this.logger = hub.ServiceProvider.GetRequiredService>(); this.mesh = hub.ServiceProvider.GetRequiredService(); + this.nodeTypeService = hub.ServiceProvider.GetService(); } /// - /// Resolves @ prefix and quotes from path. Example: @graph/org1 -> graph/org1, "@content/My File.md" -> content/My File.md + /// Looks up the cached compilation error for the owning NodeType of . + /// - If is a NodeType definition, checks its own path. + /// - Otherwise checks the NodeType's path. + /// Returns null if no error is recorded. + /// + private string? LookupCompilationError(MeshNode node) + { + if (nodeTypeService == null) return null; + var nodeTypePath = node.Content is Graph.Configuration.NodeTypeDefinition + ? node.Path + : node.NodeType; + return !string.IsNullOrEmpty(nodeTypePath) + ? nodeTypeService.GetCompilationError(nodeTypePath) + : null; + } + + /// + /// Resolves @ prefix and normalises agent-emitted formatting noise. + /// Models / autocomplete frequently wrap spaced filenames in quotes ("foo bar.docx", + /// 'foo bar.docx'), put quotes around different segments, or include surrounding + /// whitespace. None of those characters are legal mesh-path content, so we strip + /// them regardless of position. Examples: + /// @graph/org1 → graph/org1 + /// "@content/My File.md" → content/My File.md + /// @/Org/content/"My File.docx" → /Org/content/My File.docx + /// @/Org/"content/My File.docx" → /Org/content/My File.docx + /// @"/Org/content/My File.docx" → /Org/content/My File.docx + /// @/Org/content/'My File.docx' → /Org/content/My File.docx + /// " @/Org/content/My File.docx " → /Org/content/My File.docx /// public static string ResolvePath(string path) { - // Strip surrounding quotes (autocomplete wraps spaced paths in quotes) - if (path.Length >= 2 && path[0] == '"' && path[^1] == '"') - path = path[1..^1]; + if (string.IsNullOrEmpty(path)) + return path; + + // Strip surrounding/inner whitespace and quote characters in one pass. + path = path.Trim(); + if (path.IndexOfAny(['"', '\'']) >= 0) + path = path.Replace("\"", string.Empty).Replace("'", string.Empty); if (path.StartsWith("@")) return path[1..]; @@ -155,6 +189,11 @@ public async Task Get(string path) await foreach (var node in mesh.QueryAsync( MeshQueryRequest.FromQuery($"path:{resolvedPath}"))) { + var compileError = LookupCompilationError(node); + if (compileError != null) + return JsonSerializer.Serialize( + new { node, compilationError = compileError }, + hub.JsonSerializerOptions); return JsonSerializer.Serialize(node, hub.JsonSerializerOptions); } @@ -841,4 +880,136 @@ internal async Task BuildNullContentErrorAsync(string path, string nodeT return Task.FromResult(null); } } + + /// + /// Recycles the hub at by posting a + /// . The next access re-initialises the hub — which + /// means a fresh NodeType compile and fresh data loads. Useful after fixing a + /// broken NodeType or when something is stuck in an inconsistent cached state. + /// Returns a JSON {status, path} envelope. The caller should wait ~100ms + /// before re-accessing so the grain teardown completes. + /// + public Task Recycle(string path) + { + logger.LogInformation("Recycle called with path={Path}", path); + + if (string.IsNullOrWhiteSpace(path)) + return Task.FromResult(JsonSerializer.Serialize( + new { status = "Error", message = "path is required" }, + hub.JsonSerializerOptions)); + + var resolvedPath = ResolvePath(path); + if (string.IsNullOrWhiteSpace(resolvedPath)) + return Task.FromResult(JsonSerializer.Serialize( + new { status = "Error", message = "path is required" }, + hub.JsonSerializerOptions)); + + try + { + // 1. Flush LOCAL NodeTypeService caches so a fresh compile runs on next access. + // Disposing the hub alone is not enough — NodeTypeService._compilationErrors + // and _compilationTasks survive hub teardown and would keep serving stale + // errors. + nodeTypeService?.InvalidateCache(resolvedPath); + + // 2. Broadcast the invalidation across silos via IMeshChangeFeed. Every silo's + // NodeTypeService subscribes to this feed and calls InvalidateCache locally + // when it sees an event for a tracked NodeType path. + var changeFeed = hub.ServiceProvider.GetService(); + if (changeFeed != null) + { + var segments = resolvedPath.Split('/'); + var id = segments.Length > 0 ? segments[^1] : resolvedPath; + var ns = segments.Length > 1 ? string.Join("/", segments[..^1]) : ""; + changeFeed.Publish(new MeshChangeEvent( + Namespace: ns, + Id: id, + Path: resolvedPath, + Kind: MeshChangeKind.Updated, + NodeType: MeshNode.NodeTypePath, + Version: 0, + Timestamp: DateTimeOffset.UtcNow)); + } + + // 3. Dispose the hub so the next request re-initialises with fresh config. + hub.Post(new DisposeRequest(), o => o.WithTarget(new Address(resolvedPath))); + return Task.FromResult(JsonSerializer.Serialize( + new + { + status = "Recycled", + path = resolvedPath, + message = "DisposeRequest posted + cache invalidation broadcast via MeshChangeFeed. Wait ~100ms before the next access." + }, + hub.JsonSerializerOptions)); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Error recycling {Path}", resolvedPath); + return Task.FromResult(JsonSerializer.Serialize( + new { status = "Error", path = resolvedPath, message = ex.Message }, + hub.JsonSerializerOptions)); + } + } + + /// + /// Returns compilation diagnostics for a NodeType or an instance of one. + /// The response is JSON with status (Error / Ok / + /// Unknown) and, when relevant, the error text from the last compile. + /// Used by the Coder agent's self-verification loop after creating / updating + /// a NodeType. + /// + public async Task GetDiagnostics(string path) + { + logger.LogInformation("GetDiagnostics called with path={Path}", path); + + if (string.IsNullOrWhiteSpace(path)) + return JsonSerializer.Serialize( + new { status = "Error", message = "path is required" }, + hub.JsonSerializerOptions); + + var resolvedPath = ResolvePath(path); + if (nodeTypeService == null) + return JsonSerializer.Serialize( + new { status = "Unknown", message = "INodeTypeService not registered on this hub" }, + hub.JsonSerializerOptions); + + // Resolve the owning NodeType path: either the path itself (if it IS a NodeType) + // or the NodeType of the instance at that path. + string? nodeTypePath = null; + await foreach (var node in mesh.QueryAsync(MeshQueryRequest.FromQuery($"path:{resolvedPath}"))) + { + nodeTypePath = node.Content is Graph.Configuration.NodeTypeDefinition + ? node.Path + : node.NodeType; + break; + } + + if (string.IsNullOrEmpty(nodeTypePath)) + return JsonSerializer.Serialize( + new { status = "Unknown", message = $"Not found: {resolvedPath}" }, + hub.JsonSerializerOptions); + + // Compiling has priority over any prior error — the error we're seeing is stale + // and a fresh result is on its way. Tell the caller to wait and retry. + if (nodeTypeService.IsCompiling(nodeTypePath)) + { + var startedAt = nodeTypeService.GetCompilationStartedAt(nodeTypePath); + var elapsedMs = startedAt is null + ? (long?)null + : (long)(DateTimeOffset.UtcNow - startedAt.Value).TotalMilliseconds; + return JsonSerializer.Serialize( + new { status = "Compiling", nodeTypePath, elapsedMs }, + hub.JsonSerializerOptions); + } + + var err = nodeTypeService.GetCompilationError(nodeTypePath); + if (string.IsNullOrEmpty(err)) + return JsonSerializer.Serialize( + new { status = "Ok", nodeTypePath }, + hub.JsonSerializerOptions); + + return JsonSerializer.Serialize( + new { status = "Error", nodeTypePath, error = err }, + hub.JsonSerializerOptions); + } } diff --git a/src/MeshWeaver.AI/MeshPlugin.cs b/src/MeshWeaver.AI/MeshPlugin.cs index 231bf58ab..17c38bcce 100644 --- a/src/MeshWeaver.AI/MeshPlugin.cs +++ b/src/MeshWeaver.AI/MeshPlugin.cs @@ -68,6 +68,22 @@ public Task Delete( return ops.Delete(paths); } + [Description("Returns compilation diagnostics for a NodeType or an instance of one. Status is 'Ok' when the type compiled cleanly, 'Error' with a detailed message when it failed, or 'Unknown' when no compile has happened yet. Use this after creating/updating a NodeType to verify it actually compiles — a NodeType that doesn't compile is not 'done'.")] + public Task GetDiagnostics( + [Description("Path to a NodeType (e.g., @Systemorph/SocialMedia/Profile) or to any instance of one")] string path) + { + RestoreAccessContext(); + return ops.GetDiagnostics(ResolveContextPath(path)); + } + + [Description("Recycles the hub at the given path by posting DisposeRequest. Forces a fresh hub initialization on the next access — use this after fixing a broken NodeType, after editing the `sources` list, or whenever a grain is stuck. Returns {status:'Recycled', path}. Wait ~100ms before the next access so the grain teardown completes.")] + public Task Recycle( + [Description("Path to the node (e.g., @Systemorph/SocialMedia/Profile). Use the NodeType path to recycle the whole type; use an instance path to recycle just that instance's hub.")] string path) + { + RestoreAccessContext(); + return ops.Recycle(ResolveContextPath(path)); + } + /// /// Restores the user's AccessContext from . /// AsyncLocal doesn't flow reliably through the AI framework's streaming + tool @@ -108,6 +124,7 @@ public IList CreateTools() AIFunctionFactory.Create(Get), AIFunctionFactory.Create(Search), AIFunctionFactory.Create(NavigateTo), + AIFunctionFactory.Create(GetDiagnostics), ]; } @@ -125,6 +142,8 @@ public IList CreateAllTools() AIFunctionFactory.Create(Update), AIFunctionFactory.Create(Patch), AIFunctionFactory.Create(Delete), + AIFunctionFactory.Create(GetDiagnostics), + AIFunctionFactory.Create(Recycle), ]; } } diff --git a/src/MeshWeaver.AI/Plugins/DelegationTool.cs b/src/MeshWeaver.AI/Plugins/DelegationTool.cs index 1a283a12e..61ea93e73 100644 --- a/src/MeshWeaver.AI/Plugins/DelegationTool.cs +++ b/src/MeshWeaver.AI/Plugins/DelegationTool.cs @@ -1,5 +1,6 @@ using System.Collections.Immutable; using System.ComponentModel; +using System.Runtime.CompilerServices; using System.Text.Json; using Microsoft.Extensions.AI; using Microsoft.Extensions.Logging; @@ -7,33 +8,16 @@ namespace MeshWeaver.AI.Plugins; /// -/// Represents a delegation result that can be expanded in the UI. +/// Result record preserved for tests + . +/// No longer used as the delegation tool return shape — the tool now yields +/// chunks directly. /// public record DelegationResult { - /// - /// The agent that was delegated to. - /// public required string AgentName { get; init; } - - /// - /// The task that was delegated. - /// public required string Task { get; init; } - - /// - /// The result from the delegated agent. - /// public required string Result { get; init; } - - /// - /// Whether the delegation was successful. - /// public bool Success { get; init; } = true; - - /// - /// The thread ID used for the delegation (for isolated context). - /// public string? ThreadId { get; init; } } @@ -46,86 +30,27 @@ public record DelegationInfo(string AgentPath, string Description); /// /// Creates delegation tools for agents that support isolated context per delegation. -/// Each delegation creates a new thread for the target agent, and the result -/// is returned to the parent agent as a tool result. +/// +/// The tool signature is so that the sub-thread's +/// streaming text flows back as incremental chunks. Microsoft.Extensions.AI aggregates +/// the yielded chunks as the tool result; meanwhile, a side-channel delta push keeps the +/// parent's response bubble updated in real time so the user sees sub-thread progress +/// inline without waiting for completion. +/// +/// No more — the previous Task-returning shape forced the +/// FunctionInvokingChatClient to block on sub-thread completion, which deadlocks under +/// Orleans when the child's completion patch queues behind the parent hub scheduler. /// public static class DelegationTool { - /// - /// Creates a delegation tool that delegates to a specific agent with an isolated thread. - /// The delegation is visible in the UI as a tool invocation. - /// - /// Name of the agent to delegate to - /// Description of when to use this agent - /// Function to execute the delegation - /// Optional logger - /// An AITool for delegation - public static AITool CreateDelegationTool( - string targetAgentName, - string targetAgentDescription, - Func> executeAsync, - ILogger? logger = null) - { - async Task DelegateToAgent( - [Description("The task or instructions to send to the specialized agent. Be specific about what you need.")] string task, - CancellationToken cancellationToken) - { - logger?.LogInformation("Delegating to {TargetAgent}: {Task}", targetAgentName, task); - - try - { - var result = await executeAsync(task, cancellationToken); - - if (result.Success) - { - logger?.LogInformation("Delegation to {TargetAgent} completed successfully", targetAgentName); - return result.Result; - } - else - { - logger?.LogWarning("Delegation to {TargetAgent} failed: {Result}", targetAgentName, result.Result); - return $"Delegation to {targetAgentName} failed: {result.Result}"; - } - } - catch (OperationCanceledException) - { - logger?.LogInformation("Delegation to {TargetAgent} was cancelled", targetAgentName); - throw; - } - catch (Exception ex) - { - logger?.LogError(ex, "Error during delegation to {TargetAgent}", targetAgentName); - return $"Error delegating to {targetAgentName}: {ex.Message}"; - } - } - - var description = $""" - {targetAgentDescription} - - This tool delegates the task to the {targetAgentName} agent, which has specialized capabilities. - The agent will execute the task in its own isolated context and return the result. - Wait for the result before continuing with your response. - """; - - return AIFunctionFactory.Create( - DelegateToAgent, - name: $"delegate_to_{targetAgentName}", - description: description); - } - /// /// Creates a unified delegation tool that can delegate to any available agent. /// Each delegation uses an isolated thread for the target agent. /// - /// The current agent's configuration - /// All agents in the namespace hierarchy - /// Function to execute delegations - /// Optional logger - /// An AITool for unified delegation public static AITool CreateUnifiedDelegationTool( AgentConfiguration currentAgent, IReadOnlyList hierarchyAgents, - Func> executeAsync, + Func> executeAsync, ILogger? logger = null) { var delegationInfo = ImmutableList.Empty; @@ -158,45 +83,28 @@ public static AITool CreateUnifiedDelegationTool( PropertyNamingPolicy = JsonNamingPolicy.CamelCase }); - async Task Delegate( + async IAsyncEnumerable Delegate( [Description("The name of the agent to delegate to. Use the agentPath from the available agents.")] string agentName, [Description("The task or instructions for the delegated agent. Be specific about what you need.")] string task, [Description("Optional: the node path to use as context for this delegation (e.g., 'OrgA/my-doc'). When omitted, inherits the parent context. Set explicitly when delegating parallel work on different documents.")] string? context = null, - CancellationToken cancellationToken = default) + [EnumeratorCancellation] CancellationToken cancellationToken = default) { - logger?.LogInformation("Delegating to {AgentName}: {Task}, context={Context}", agentName, task, context ?? "(inherited)"); - - try - { - var result = await executeAsync(agentName, task, context, cancellationToken); + logger?.LogInformation("Delegating to {AgentName}: {Task}, context={Context}", + agentName, task, context ?? "(inherited)"); - if (result.Success) - { - logger?.LogInformation("Delegation to {AgentName} completed successfully", agentName); - return result.Result; - } - else - { - logger?.LogWarning("Delegation to {AgentName} failed: {Result}", agentName, result.Result); - return $"Delegation to {agentName} failed: {result.Result}"; - } - } - catch (OperationCanceledException) + await foreach (var chunk in executeAsync(agentName, task, context, cancellationToken) + .WithCancellation(cancellationToken)) { - logger?.LogInformation("Delegation to {AgentName} was cancelled", agentName); - throw; - } - catch (Exception ex) - { - logger?.LogError(ex, "Error during delegation to {AgentName}", agentName); - return $"Error delegating to {agentName}: {ex.Message}"; + yield return chunk; } + + logger?.LogInformation("Delegation to {AgentName} stream completed", agentName); } var description = $""" Delegate to a specialized agent when the request matches their expertise. Each delegation runs in an isolated context - the agent won't see previous conversation history. - Wait for the result before continuing with your response. + The delegated agent's output streams back as it generates. When delegating parallel work on different documents, set the 'context' parameter to the specific node path for each delegation. This ensures each agent sees the correct document. diff --git a/src/MeshWeaver.AI/Thread.cs b/src/MeshWeaver.AI/Thread.cs index 98e59d9c2..628c117da 100644 --- a/src/MeshWeaver.AI/Thread.cs +++ b/src/MeshWeaver.AI/Thread.cs @@ -135,17 +135,26 @@ public record Thread /// public DateTime? ExecutionStartedAt { get; init; } - /// - /// Streaming text buffer — transient, never persisted. - /// Used only in-memory during active execution for the status bar preview. - /// /// /// Pending user message text — set at thread creation to auto-start execution. /// When the thread grain activates and sees this, it immediately starts streaming. /// Cleared after execution starts. + /// Legacy: still used by the auto-execute-on-creation path. New submissions + /// from the GUI populate instead. /// public string? PendingUserMessage { get; init; } + /// + /// User messages submitted by the client but not yet ingested into a round. + /// Keyed by user message id. The server-side submission watcher creates + /// satellite ThreadMessage cells from these entries and clears them once + /// the round is dispatched. Lets us do the entire submission as a single + /// atomic stream.Update on this thread node — no separate + /// CreateNodeRequest, no AppendUserMessageRequest. + /// + public ImmutableDictionary PendingUserMessages { get; init; } + = ImmutableDictionary.Empty; + /// Agent name for pending execution. public string? PendingAgentName { get; init; } @@ -264,4 +273,21 @@ public record ThreadMessage /// The server watcher truncates the thread after this id and re-ingests. /// public bool IsResubmit { get; init; } + + /// + /// Token usage reported by the model provider. Populated for AgentResponse cells + /// when the streaming finishes. Null while streaming or when the provider didn't + /// report usage (e.g., some local models). Sum of + + /// may differ from if the + /// provider includes cached / reasoning tokens. + /// + public int? InputTokens { get; init; } + public int? OutputTokens { get; init; } + public int? TotalTokens { get; init; } + + /// + /// Wall-clock time when the assistant response finished streaming. Null while + /// streaming. CompletedAt - Timestamp is the per-message duration. + /// + public DateTime? CompletedAt { get; init; } } diff --git a/src/MeshWeaver.AI/ThreadExecution.cs b/src/MeshWeaver.AI/ThreadExecution.cs index 644118ed0..b27ff26e8 100644 --- a/src/MeshWeaver.AI/ThreadExecution.cs +++ b/src/MeshWeaver.AI/ThreadExecution.cs @@ -429,11 +429,15 @@ void RespondWithError(string error) /// Async handler on the _Exec hosted hub. /// Prepares agent and await-streams the response. /// Uses UpdateMeshNode on a remote stream to push text to the response node. - /// - /// - /// Fully reactive execution handler — zero await, zero QueryAsync. - /// Subscribes to chatClient.Initialize() observable, then runs streaming in the callback. - /// The AI API streaming (GetStreamingResponseAsync) runs via hub.InvokeAsync for async I/O. + /// + /// User input received while a round is in progress is held in + /// . The submission watcher dispatches + /// a NEW round (with its own response cell) as soon as this one completes — so + /// follow-up typed input is naturally queued without cancelling the current + /// model turn. Mid-iteration drain (injecting new user input into the same + /// response without round-boundary tear-down) would require manually orchestrating + /// the tool loop instead of relying on Microsoft.Extensions.AI's auto-invocation; + /// that's intentionally NOT done here. /// internal static IMessageDelivery ExecuteMessageAsync( IMessageHub hub, @@ -690,6 +694,9 @@ void UpdateThreadExecution(Func mutate) var ct = executionCts.Token; var responseText = new StringBuilder(); capturedResponseText = responseText; + int? inputTokens = null; + int? outputTokens = null; + int? totalTokens = null; try { logger.LogInformation("[ThreadExec] STREAMING_LOOP_ENTRY: {Time:HH:mm:ss.fff} threadPath={ThreadPath} (on thread pool)", DateTime.UtcNow, threadPath); @@ -697,6 +704,7 @@ void UpdateThreadExecution(Func mutate) // and delegations where the streaming loop is blocked. using var heartbeatSubscription = parentHub.BeginAsyncOperation(); var lastUpdate = DateTimeOffset.MinValue; + var lastPushedTextLength = 0; var pendingCalls = ImmutableDictionary.Empty; string? lastCallKey = null; @@ -735,6 +743,18 @@ void UpdateThreadExecution(Func mutate) }); } } + else if (content is UsageContent usage) + { + // Aggregate token usage across stream chunks. Providers vary — + // some report once at the end, others on every chunk; sum either way. + var d = usage.Details; + if (d?.InputTokenCount is { } it) + inputTokens = (inputTokens ?? 0) + (int)it; + if (d?.OutputTokenCount is { } ot) + outputTokens = (outputTokens ?? 0) + (int)ot; + if (d?.TotalTokenCount is { } tt) + totalTokens = (totalTokens ?? 0) + (int)tt; + } else if (content is FunctionResultContent functionResult) { logger.LogDebug("[ThreadExec] TOOL_RESULT: {Time:HH:mm:ss.fff} callId={CallId}, success={Success}, resultLen={Length}", @@ -790,6 +810,9 @@ void UpdateThreadExecution(Func mutate) // Push streaming content at ~1/3sec — reduced frequency to avoid // overloading the grain scheduler (messages expire if queue backs up). + // Push as a TEXT DELTA: we send only the new characters since the last + // push (tracked by lastPushedTextLength). The response cell appends it, + // so we never ship the whole growing string every tick. if (DateTimeOffset.UtcNow - lastUpdate > TimeSpan.FromMilliseconds(3000)) { // Stamp delegation paths on any unmatched delegation tool calls @@ -802,19 +825,47 @@ void UpdateThreadExecution(Func mutate) return e; }).ToImmutableList(); - PushToResponseMessage(responseText.ToString(), toolCallLog, nodeChangeLog, - request.AgentName, request.ModelName); + var delta = responseText.Length > lastPushedTextLength + ? responseText.ToString(lastPushedTextLength, responseText.Length - lastPushedTextLength) + : null; + // First push replaces the "Generating response…" placeholder; subsequent + // pushes append deltas only. + var isFirstPush = lastPushedTextLength == 0; + lastPushedTextLength = responseText.Length; + parentHub.Post(new UpdateThreadMessageContent + { + Text = isFirstPush ? responseText.ToString() : null, + TextDelta = isFirstPush ? null : delta, + ToolCalls = toolCallLog, + UpdatedNodes = nodeChangeLog, + AgentName = request.AgentName, + ModelName = request.ModelName + }, o => o.WithTarget(new Address(responsePath))); lastUpdate = DateTimeOffset.UtcNow; } } - // Final update — aggregate node changes (merges sub-thread changes with min/max versions) + // Final update — aggregate node changes (merges sub-thread changes with min/max versions), + // include token usage + completion timestamp so the cell can show duration / tokens. var aggregatedChanges = AggregateNodeChanges(nodeChangeLog); - logger.LogInformation("[ThreadExec] EXECUTION_COMPLETE: {Time:HH:mm:ss.fff} threadPath={ThreadPath}, responseLength={Length}, toolCalls={ToolCalls}", - DateTime.UtcNow, threadPath, responseText.Length, toolCallLog.Count); + if (totalTokens is null && (inputTokens.HasValue || outputTokens.HasValue)) + totalTokens = (inputTokens ?? 0) + (outputTokens ?? 0); + logger.LogInformation("[ThreadExec] EXECUTION_COMPLETE: {Time:HH:mm:ss.fff} threadPath={ThreadPath}, responseLength={Length}, toolCalls={ToolCalls}, tokens={In}/{Out}/{Total}", + DateTime.UtcNow, threadPath, responseText.Length, toolCallLog.Count, + inputTokens, outputTokens, totalTokens); var finalText = responseText.ToString(); - PushToResponseMessage(finalText, toolCallLog, aggregatedChanges, - request.AgentName, request.ModelName); + parentHub.Post(new UpdateThreadMessageContent + { + Text = finalText, + ToolCalls = toolCallLog, + UpdatedNodes = aggregatedChanges, + AgentName = request.AgentName, + ModelName = request.ModelName, + InputTokens = inputTokens, + OutputTokens = outputTokens, + TotalTokens = totalTokens, + CompletedAt = DateTime.UtcNow + }, o => o.WithTarget(new Address(responsePath))); // Clear streaming state UpdateThreadExecution(t => t with { diff --git a/src/MeshWeaver.AI/ThreadInput.cs b/src/MeshWeaver.AI/ThreadInput.cs new file mode 100644 index 000000000..cea3b6943 --- /dev/null +++ b/src/MeshWeaver.AI/ThreadInput.cs @@ -0,0 +1,107 @@ +using MeshWeaver.Data; +using MeshWeaver.Graph; +using MeshWeaver.Mesh; +using MeshWeaver.Messaging; +using MeshThread = MeshWeaver.AI.Thread; + +namespace MeshWeaver.AI; + +/// +/// Testable, Blazor-free helpers for appending user input into a thread. +/// +/// The whole submission is one atomic workspace.UpdateMeshNode on the +/// thread node — adding the new id to UserMessageIds and stashing the +/// message payload in . The server +/// watcher creates the satellite cell and dispatches the next round. +/// +/// This replaces the legacy two-message dance (CreateNodeRequest + +/// AppendUserMessageRequest), eliminating the duplicate-dispatch races caused +/// by interleaved fire-and-forget posts. +/// +public static class ThreadInput +{ + private static string NewId() => Guid.NewGuid().ToString("N")[..8]; + + /// + /// Pure: builds a user record. No I/O. + /// + public static ThreadMessage CreateUserMessage( + string text, + string? createdBy = null, + string? authorName = null, + string? agentName = null, + string? modelName = null, + string? contextPath = null, + IReadOnlyList? attachments = null) => + new() + { + Role = "user", + Text = text, + AuthorName = authorName, + CreatedBy = createdBy, + AgentName = agentName, + ModelName = modelName, + ContextPath = contextPath, + Attachments = attachments, + Timestamp = DateTime.UtcNow, + Type = ThreadMessageType.ExecutedInput + }; + + /// + /// Atomically appends a user message to via a + /// single workspace.UpdateMeshNode on the thread's MeshNode. Returns + /// the generated message id. The server-side submission watcher creates the + /// satellite cell from and + /// dispatches the next round. + /// + public static string AppendUserInput( + IWorkspace workspace, + string threadPath, + ThreadMessage message) + { + if (string.IsNullOrEmpty(threadPath)) + throw new ArgumentException("threadPath is required", nameof(threadPath)); + ArgumentNullException.ThrowIfNull(workspace); + ArgumentNullException.ThrowIfNull(message); + + var msgId = NewId(); + + // Append the message to PendingUserMessages + UserMessageIds only. + // + // We deliberately do NOT add to Thread.Messages here — the GUI renders one + // LayoutAreaControl per id in Messages, and rendering a control before its + // satellite ThreadMessage node has been created on the hub triggers + // "Cannot access a disposed object" + spurious area-stream errors. The + // server-side submission watcher creates the satellite cell first via + // IMeshService.CreateNode and only after CreateNode confirms success does + // it add the id into Messages (in the same atomic update that flips + // IsExecuting=true alongside the response cell id). + // + // Using the no-address overload (FirstOrDefault) avoids a pre-existing + // path-vs-id key mismatch in the address-aware overload. This call expects + // to run on the thread's own hub (e.g., from the AppendUserMessageRequest + // handler) where there's exactly one node in the collection. + workspace.UpdateMeshNode(node => + { + var thread = node.Content as MeshThread ?? new MeshThread(); + var userIds = thread.UserMessageIds.Contains(msgId) + ? thread.UserMessageIds + : thread.UserMessageIds.Add(msgId); + var pending = thread.PendingUserMessages.SetItem(msgId, message); + return node with + { + Content = thread with + { + UserMessageIds = userIds, + PendingUserMessages = pending, + PendingAgentName = message.AgentName ?? thread.PendingAgentName, + PendingModelName = message.ModelName ?? thread.PendingModelName, + PendingContextPath = message.ContextPath ?? thread.PendingContextPath, + PendingAttachments = message.Attachments ?? thread.PendingAttachments + } + }; + }); + + return msgId; + } +} diff --git a/src/MeshWeaver.AI/ThreadLayoutAreas.cs b/src/MeshWeaver.AI/ThreadLayoutAreas.cs index 6c870330f..81ac18aa1 100644 --- a/src/MeshWeaver.AI/ThreadLayoutAreas.cs +++ b/src/MeshWeaver.AI/ThreadLayoutAreas.cs @@ -45,6 +45,7 @@ public static MessageHubConfiguration AddThreadLayoutAreas(this MessageHubConfig .WithView(ThreadNodeType.ThreadChatArea, ThreadChatView) .WithView(ThreadNodeType.StreamingArea, StreamingView) .WithView(ThreadNodeType.HistoryArea, HistoryView) + .WithView(ThreadNodeType.HeaderArea, HeaderView) .WithView(MeshNodeLayoutAreas.ThumbnailArea, Thumbnail) .WithView(MeshNodeLayoutAreas.ThreadsArea, ThreadsCatalog)); @@ -518,4 +519,301 @@ private static UiControl ThreadsView(LayoutAreaHost host, RenderingContext _) .WithRenderMode(MeshSearchRenderMode.Flat) .WithCreateNodeType("Thread"); } + + /// + /// Header area shown above the chat. Renders, when applicable: + /// • A back-link to the parent thread (if this is a delegation sub-thread — + /// detected by path nesting under another thread's response message id). + /// • A summary of nodes modified during this thread's runs, aggregated across + /// every entry, with version-before/ + /// version-after. Each entry links to the node's Versions area where the + /// existing compare/restore UI lives. + /// Pure subscription on the thread MeshNode; no awaits, no QueryAsync. + /// + public static IObservable HeaderView(LayoutAreaHost host, RenderingContext _) + { + var threadPath = host.Hub.Address.ToString(); + var parentLink = TryBuildParentLink(threadPath); + + var stream = host.Workspace.GetStream(new MeshNodeReference()); + if (stream is null) + return Observable.Return(parentLink); + + // Emit an immediate starting value so the LayoutAreaView never shows a skeleton + // for this area — the header is ancillary and should never block the chat view. + // Subsequent emissions fold in the aggregated UpdatedNodes summary. + var initial = Observable.Return(BuildHeader(parentLink, ImmutableList.Empty, threadPath)); + + var aggregated = stream + .Select(change => (change.Value?.Content as MeshThread)?.Messages ?? ImmutableList.Empty) + .Where(ids => ids.Count > 0) + .Select(ids => (ids, key: string.Join("|", ids))) + .DistinctUntilChanged(p => p.key) + .Select(p => CollectUpdatedNodes(host.Hub, threadPath, p.ids)) + .Switch() + .Select(updates => BuildHeader(parentLink, updates, threadPath)); + + return initial.Concat(aggregated); + } + + /// + /// Walks , requests each satellite ThreadMessage via + /// GetDataRequest (Post + RegisterCallback wrapped as an Observable), accumulates + /// their UpdatedNodes, and emits the aggregated list once all responses arrive. + /// + private static IObservable> CollectUpdatedNodes( + IMessageHub hub, string threadPath, ImmutableList messageIds) + { + if (messageIds.IsEmpty) return Observable.Return(ImmutableList.Empty); + + var subjects = messageIds.Select(id => + { + var subject = new System.Reactive.Subjects.AsyncSubject>(); + var del = hub.Post(new GetDataRequest(new MeshNodeReference()), + o => o.WithTarget(new Address($"{threadPath}/{id}"))); + if (del is null) + { + subject.OnNext(ImmutableList.Empty); + subject.OnCompleted(); + } + else + { + hub.RegisterCallback((IMessageDelivery)del, resp => + { + var msg = resp is IMessageDelivery gdr + ? (gdr.Message.Data as MeshNode)?.Content as ThreadMessage + : null; + subject.OnNext(msg?.UpdatedNodes ?? ImmutableList.Empty); + subject.OnCompleted(); + return resp; + }); + } + return subject.AsObservable(); + }).ToList(); + + return Observable.CombineLatest(subjects) + .Select(parts => ThreadExecution.AggregateNodeChanges( + parts.SelectMany(p => p).ToImmutableList())) + .Take(1) + .Timeout(TimeSpan.FromSeconds(5)) + .Catch, Exception>(_ => + Observable.Return(ImmutableList.Empty)); + } + + private static UiControl? TryBuildParentLink(string threadPath) + { + // Sub-thread paths nest under a parent response message: + // {parentThreadPath}/{parentResponseMsgId}/{thisThreadId} + // If we can find a ".../<8-hex-id>/" pattern we treat this as a delegation. + var segments = threadPath.Split('/'); + if (segments.Length < 3) return null; + var parentMsgId = segments[^2]; + if (parentMsgId.Length != 8) return null; + var parentThreadPath = string.Join('/', segments[..^2]); + if (string.IsNullOrEmpty(parentThreadPath)) return null; + + var encoded = System.Web.HttpUtility.HtmlEncode(parentThreadPath); + var html = + $"" + + $" Delegated from {encoded}"; + return Controls.Html(html); + } + + private static UiControl? BuildHeader(UiControl? parentLink, ImmutableList updates, string threadPath) + { + if (parentLink is null && updates.IsEmpty) return null; + + var stack = Controls.Stack + .WithStyle("gap:6px; padding:8px 12px; margin-bottom:8px; " + + "background:var(--neutral-layer-1); border:1px solid var(--neutral-stroke-rest); " + + "border-radius:8px;"); + + if (parentLink is not null) + stack = stack.WithView(parentLink); + + if (!updates.IsEmpty) + stack = stack.WithView(Controls.Html(BuildModifiedNodesHtml(updates, threadPath))); + + return stack; + } + + /// + /// Git-like panel for the aggregated UpdatedNodes list: + /// - Tabular layout (CSS grid): path · old-ver · → · new-ver · Diff · Restore v{old} · Restore v{new}. + /// - All action links visible inline on each row (no hidden ⋯ menu). + /// - Theme-safe colours that work in dark + light mode (no white-on-light-blue). + /// - Paths rendered relative to the thread's parent namespace so the most + /// interesting segment (leaf) stays visible when the table narrows. + /// - On screens < 720 px the whole section collapses behind a summary row. + /// + private static string BuildModifiedNodesHtml(ImmutableList updates, string threadPath) + { + // Derive the ancestor prefix we can strip from each node path to produce a + // shorter display form. For a thread at "Org/_Thread/abc", the interesting + // prefix to strip is "Org/" (the thread's root namespace, above _Thread). + var threadIdx = threadPath.IndexOf("/_Thread/", StringComparison.Ordinal); + var shortenPrefix = threadIdx > 0 ? threadPath[..(threadIdx + 1)] : null; + + static string Shorten(string path, string? prefix) => + prefix is not null && path.StartsWith(prefix, StringComparison.Ordinal) + ? path[prefix.Length..] + : path; + + var sb = new System.Text.StringBuilder(); + + // Inline + """); + + sb.Append("
"); + sb.Append($" Modified nodes ({updates.Count})"); + + sb.Append("
"); + + foreach (var entry in updates) + { + var path = entry.Path; + var pathEnc = System.Web.HttpUtility.HtmlEncode(path); + var displayEnc = System.Web.HttpUtility.HtmlEncode(Shorten(path, shortenPrefix)); + var op = entry.Operation ?? ""; + + sb.Append("
"); + + // Column 1: path (truncates on narrow layouts) + sb.Append( + $"{displayEnc}"); + + // Column 2: old version chip or "new" marker + if (entry.VersionBefore is { } vb) + sb.Append( + $"v{vb}"); + else if (op.Equals("Created", StringComparison.OrdinalIgnoreCase)) + sb.Append("new"); + else + sb.Append(""); + + // Column 3: arrow + sb.Append(""); + + // Column 4: new version chip or "deleted" marker + if (entry.VersionAfter is { } va) + sb.Append( + $"v{va}"); + else if (op.Equals("Deleted", StringComparison.OrdinalIgnoreCase)) + sb.Append("deleted"); + else + sb.Append(""); + + // Column 5: Diff (old ↔ new) — points to VersionDiff with from/to params. + if (entry.VersionBefore.HasValue && entry.VersionAfter.HasValue) + sb.Append( + $"Diff"); + else + sb.Append(""); + + // Column 6: Restore to old — opens VersionDiff (which has the Restore button). + if (entry.VersionBefore.HasValue) + sb.Append( + $"Restore v{entry.VersionBefore.Value}"); + else + sb.Append(""); + + // Column 7: Restore to new — opens VersionDiff (which has the Restore button). + if (entry.VersionAfter.HasValue) + sb.Append( + $"Restore v{entry.VersionAfter.Value}"); + else + sb.Append(""); + + sb.Append("
"); + } + + sb.Append("
"); + sb.Append("
"); + return sb.ToString(); + } } diff --git a/src/MeshWeaver.AI/ThreadMessageLayoutAreas.cs b/src/MeshWeaver.AI/ThreadMessageLayoutAreas.cs index 1e07c28ca..66b72033b 100644 --- a/src/MeshWeaver.AI/ThreadMessageLayoutAreas.cs +++ b/src/MeshWeaver.AI/ThreadMessageLayoutAreas.cs @@ -119,15 +119,24 @@ private static IMessageDelivery HandleUpdateContent( hub.GetWorkspace().UpdateMeshNode(node => { var current = node.Content as ThreadMessage ?? new ThreadMessage { Role = "assistant", Text = "" }; + // Prefer incremental append (TextDelta). Full Text replacement is only used + // for final/terminal writes (completion, error, cancel markers). + var newText = msg.Text ?? (msg.TextDelta is { Length: > 0 } d + ? (current.Text ?? "") + d + : current.Text); return node with { Content = current with { - Text = msg.Text ?? current.Text, + Text = newText, ToolCalls = msg.ToolCalls ?? current.ToolCalls, UpdatedNodes = msg.UpdatedNodes ?? current.UpdatedNodes, AgentName = msg.AgentName ?? current.AgentName, - ModelName = msg.ModelName ?? current.ModelName + ModelName = msg.ModelName ?? current.ModelName, + InputTokens = msg.InputTokens ?? current.InputTokens, + OutputTokens = msg.OutputTokens ?? current.OutputTokens, + TotalTokens = msg.TotalTokens ?? current.TotalTokens, + CompletedAt = msg.CompletedAt ?? current.CompletedAt } }; }); @@ -290,6 +299,7 @@ private static UiControl BuildMessageOverview( .WithTimestamp(msg.Timestamp) .WithText(new JsonPointerReference($"{dataPointer}/text")) .WithToolCalls(new JsonPointerReference($"{dataPointer}/toolCalls")) + .WithUpdatedNodes(new JsonPointerReference($"{dataPointer}/updatedNodes")) .WithThreadPath(threadPath) .WithMessageId(messageId); @@ -352,56 +362,115 @@ private static UiControl BuildMessageOverview( .WithStyle(isUser ? "align-items: flex-end;" : "") .WithView(bubble); - // For assistant messages: show delegation sub-threads as clickable links + // Reactive metadata row for assistant cells: model · duration · tokens. + // Re-renders whenever CompletedAt or token fields change on the underlying message. if (!isUser) { - var messagePath = $"{threadPath}/{messageId}"; container = container.WithView((h, c) => { - var meshService = h.Hub.ServiceProvider.GetService(); - if (meshService == null) return Observable.Return(null); - - return Observable.FromAsync(async () => - { - try - { - var subs = await meshService - .QueryAsync($"namespace:{messagePath} nodeType:{ThreadNodeType.NodeType}") - .ToListAsync(); - if (subs.Count == 0) return (UiControl?)null; - return (UiControl?)BuildDelegationLinks(subs); - } - catch { return (UiControl?)null; } - }); + var stream = h.Workspace.GetStream(new MeshNodeReference()); + if (stream is null) return Observable.Return(null); + + return stream + .Select(change => change.Value?.Content as ThreadMessage) + .Where(m => m is not null) + .Select(m => ( + Started: m!.Timestamp, + Completed: m.CompletedAt, + Model: m.ModelName, + In: m.InputTokens, + Out: m.OutputTokens, + Total: m.TotalTokens)) + .DistinctUntilChanged() + .Select(meta => BuildAssistantMetaRow(meta.Started, meta.Completed, meta.Model, + meta.In, meta.Out, meta.Total)); }); } + // Delegation sub-threads are already shown inline inside the bubble (via the bubble's + // tool-calls data binding). An extra embedded LayoutAreaControl here produced a + // duplicate line with the same "Delegating to …" chip — redundant, so removed. + // To see full sub-thread progress, click through the delegation chip inside the bubble. + container = container.WithView(actionRow); return container; } /// - /// Builds simple navigation links for delegation sub-threads. - /// Each sub-thread is rendered as a clickable link showing its name. + /// Formats a TimeSpan as compact h/m/s: e.g. "120ms", "1.8s", "42s", "1m 23s", + /// "1h 5m 30s". Zero components are dropped. /// - private static UiControl BuildDelegationLinks(IReadOnlyList subThreads) + private static string FormatDurationHms(TimeSpan d) { - var sb = new System.Text.StringBuilder(); - sb.Append("
"); + if (d.TotalMilliseconds < 1000) return $"{d.TotalMilliseconds:F0}ms"; + if (d.TotalSeconds < 10) return $"{d.TotalSeconds:F1}s"; + var parts = new List(); + var h = (int)d.TotalHours; + if (h > 0) parts.Add($"{h}h"); + var m = d.Minutes; + if (m > 0 || h > 0) parts.Add($"{m}m"); + parts.Add($"{d.Seconds}s"); + return string.Join(' ', parts); + } - foreach (var st in subThreads) + /// + /// Builds the muted one-line metadata row shown below an assistant cell: + /// HH:mm:ss · model · 1.8s · 1,247 in / 392 out (1,639 total). Returns + /// null when there's nothing to show (e.g. response still streaming and no model + /// yet known). + /// + private static UiControl? BuildAssistantMetaRow( + DateTime started, DateTime? completed, string? model, + int? input, int? output, int? total) + { + var parts = new List(); + parts.Add(started.ToLocalTime().ToString("HH:mm:ss")); + if (!string.IsNullOrEmpty(model)) + parts.Add(System.Web.HttpUtility.HtmlEncode(model)); + if (completed.HasValue) + { + parts.Add(FormatDurationHms(completed.Value - started)); + } + if (input.HasValue || output.HasValue || total.HasValue) { - var name = System.Web.HttpUtility.HtmlEncode( - st.Name?.Length > 80 ? st.Name[..77] + "..." : st.Name ?? st.Id); - var href = $"/{st.Path}"; + var inS = input?.ToString("N0") ?? "?"; + var outS = output?.ToString("N0") ?? "?"; + var totS = total?.ToString("N0"); + var tokens = totS is null + ? $"{inS} in / {outS} out" + : $"{inS} in / {outS} out ({totS} total)"; + parts.Add(tokens); + } + if (parts.Count == 0) return null; + + var line = string.Join(" · ", parts); + return Controls.Html( + $"
{line}
"); + } - sb.Append($"" + - $" {name}"); + /// + /// Builds an embedded stack of s pointing at the + /// sub-thread's compact Streaming view. Each control opens its own + /// subscription against the sub-thread hub — the parent's execution loop never + /// reads or awaits the sub-thread's stream. Returns null when there are no + /// delegations. + /// + private static UiControl? BuildEmbeddedSubThreadAreas(IReadOnlyList subThreadPaths) + { + if (subThreadPaths.Count == 0) return null; + + var stack = Controls.Stack + .WithStyle("gap: 6px; margin: 6px 0 4px 8px; padding-left: 8px; " + + "border-left: 2px solid var(--accent-fill-rest);"); + + foreach (var path in subThreadPaths) + { + var area = new LayoutAreaControl(path, new LayoutAreaReference("Streaming")) + .WithSpinnerType(SpinnerType.Skeleton); + stack = stack.WithView(area); } - sb.Append("
"); - return Controls.Html(sb.ToString()); + return stack; } /// @@ -426,8 +495,9 @@ private static string ConvertReferencesToLinks(string text) // Don't convert email addresses if (path.Contains('@')) return match.Value; - // Use @prefix in href — LinkUrlCleanupExtension will strip @ and resolve - return $"[`@{path}`](@{path})"; + // Emit absolute href — @references from agents are always full paths + var href = path.StartsWith('/') ? path : $"/{path}"; + return $"[`@{path}`]({href})"; }); } diff --git a/src/MeshWeaver.AI/ThreadMessageViewModel.cs b/src/MeshWeaver.AI/ThreadMessageViewModel.cs index be8668ed9..edf290514 100644 --- a/src/MeshWeaver.AI/ThreadMessageViewModel.cs +++ b/src/MeshWeaver.AI/ThreadMessageViewModel.cs @@ -16,6 +16,12 @@ public record ThreadMessageViewModel public string? Timestamp { get; init; } public string Text { get; init; } = ""; public ImmutableList ToolCalls { get; init; } = []; + /// + /// Nodes this message's execution created / updated / deleted. The bubble cross- + /// references tool-call target paths against this list to render inline Diff and + /// Restore links next to each "Creating / Updating / Deleting X" chip. + /// + public ImmutableList UpdatedNodes { get; init; } = []; public static ThreadMessageViewModel FromMessage(ThreadMessage msg) => new() { @@ -24,7 +30,8 @@ public record ThreadMessageViewModel ModelName = msg.ModelName, Timestamp = msg.Timestamp.ToString("HH:mm:ss"), Text = msg.Text ?? "", - ToolCalls = msg.ToolCalls + ToolCalls = msg.ToolCalls, + UpdatedNodes = msg.UpdatedNodes }; public virtual bool Equals(ThreadMessageViewModel? other) @@ -35,7 +42,8 @@ public virtual bool Equals(ThreadMessageViewModel? other) && AuthorName == other.AuthorName && ModelName == other.ModelName && Text == other.Text - && ToolCalls.SequenceEqual(other.ToolCalls); + && ToolCalls.SequenceEqual(other.ToolCalls) + && UpdatedNodes.SequenceEqual(other.UpdatedNodes); } public override int GetHashCode() @@ -44,6 +52,7 @@ public override int GetHashCode() hash.Add(Role); hash.Add(Text); hash.Add(ToolCalls.Count); + hash.Add(UpdatedNodes.Count); return hash.ToHashCode(); } } diff --git a/src/MeshWeaver.AI/ThreadNodeType.cs b/src/MeshWeaver.AI/ThreadNodeType.cs index e82720c69..c1e5e979e 100644 --- a/src/MeshWeaver.AI/ThreadNodeType.cs +++ b/src/MeshWeaver.AI/ThreadNodeType.cs @@ -18,6 +18,14 @@ public static class ThreadNodeType /// public const string NodeType = "Thread"; + /// + /// Default Icon for Thread instances. Must match . + /// Applied explicitly in / + /// because thread creation goes via CreateNodeRequest on arbitrary parent hubs, + /// some of which don't have INodeTypeService registered to auto-copy the icon. + /// + public const string DefaultIcon = "/static/NodeTypeIcons/chat.svg"; + /// /// Satellite partition name for threads (like _Comment for comments). /// Threads are created at {contextPath}/_Thread/{speakingId}. @@ -46,6 +54,13 @@ public static class ThreadNodeType /// public const string HistoryArea = "History"; + /// + /// Layout area shown above the chat: parent-thread origin link (when this thread + /// is a delegation), aggregated list of nodes modified by this thread's execution + /// with version-before / version-after, and click-through to the version compare view. + /// + public const string HeaderArea = "Header"; + /// /// Generates a human-readable speaking ID from message text. /// Takes the first few words, lowercases, replaces non-alphanumeric with hyphens, @@ -91,6 +106,7 @@ public static MeshNode BuildThreadNode(string contextPath, string messageText, s { Name = name, NodeType = NodeType, + Icon = DefaultIcon, MainNode = contextPath, Content = new Thread { CreatedBy = createdBy } }; @@ -124,6 +140,7 @@ public static (MeshNode Thread, string UserMsgId, string ResponseMsgId) BuildThr { Name = name, NodeType = NodeType, + Icon = DefaultIcon, MainNode = contextPath, Content = new Thread { @@ -175,7 +192,7 @@ public static MeshNode CreateMeshNode( Func? hubConfiguration = null) => new(NodeType) { Name = "Thread", - Icon = "/static/NodeTypeIcons/chat.svg", + Icon = DefaultIcon, IsSatelliteType = true, ExcludeFromContext = ImmutableHashSet.Create("search"), AssemblyLocation = typeof(ThreadNodeType).Assembly.Location, diff --git a/src/MeshWeaver.AI/ThreadSubmission.cs b/src/MeshWeaver.AI/ThreadSubmission.cs index aa367e59a..f7fd6a6bc 100644 --- a/src/MeshWeaver.AI/ThreadSubmission.cs +++ b/src/MeshWeaver.AI/ThreadSubmission.cs @@ -75,18 +75,113 @@ public static ImmutableList FindUnprocessedUserMessages(MeshThread threa // ═════════════════════════════════════════════════════════════════════ /// - /// Submits a user message into an existing thread. Fire-and-forget; the caller - /// observes the new user cell appear through the thread's remote MeshNode stream. + /// Submits a user message into an existing thread. Posts a single + /// to the thread hub — the handler + /// runs locally (one atomic + /// workspace.UpdateMeshNode), and the server watcher then creates the + /// satellite cell and dispatches the round. No separate CreateNodeRequest from + /// the client — that was the duplicate-dispatch source in the legacy flow. /// public static void Submit(SubmitContext ctx) - => ThreadSubmissionClient.Submit(ctx); + { + if (string.IsNullOrEmpty(ctx.ThreadPath)) + { + ctx.OnError?.Invoke("Submit requires ThreadPath. Use CreateThreadAndSubmit for new threads."); + return; + } + + var delivery = ctx.Hub.Post( + new AppendUserMessageRequest + { + ThreadPath = ctx.ThreadPath!, + UserMessageId = Guid.NewGuid().ToString("N")[..8], // ignored by handler — kept for back-compat shape + UserText = ctx.UserText, + AgentName = ctx.AgentName, + ModelName = ctx.ModelName, + ContextPath = ctx.ContextPath, + Attachments = ctx.Attachments + }, + o => o.WithTarget(new Address(ctx.ThreadPath!))); + + if (delivery == null) + { + ctx.OnError?.Invoke("Hub.Post returned null"); + return; + } + + ctx.Hub.RegisterCallback((IMessageDelivery)delivery, response => + { + if (response is IMessageDelivery { Message.Success: false } fail) + ctx.OnError?.Invoke($"Submit failed: {fail.Message.Error ?? "unknown"}"); + return response; + }); + } /// - /// Creates a new thread node and submits the first user message. - /// fires when the thread node is confirmed so the caller can navigate immediately. + /// Creates a new thread node, then submits the first user message via + /// on the new thread. + /// fires when the thread is confirmed. /// public static void CreateThreadAndSubmit(SubmitContext ctx) - => ThreadSubmissionClient.CreateThreadAndSubmit(ctx); + { + if (string.IsNullOrEmpty(ctx.Namespace)) + { + ctx.OnError?.Invoke("CreateThreadAndSubmit requires Namespace."); + return; + } + + var threadNode = ThreadNodeType.BuildThreadNode(ctx.Namespace!, ctx.UserText, ctx.CreatedBy); + var fallbackPath = threadNode.Path!; + + var delivery = ctx.Hub.Post( + new CreateNodeRequest(threadNode), + o => o.WithTarget(new Address(ctx.Namespace!))); + + if (delivery == null) + { + ctx.OnError?.Invoke("Hub.Post returned null"); + return; + } + + ctx.Hub.RegisterCallback((IMessageDelivery)delivery, response => + { + if (response is not IMessageDelivery { Message.Success: true } cnr) + { + var err = (response as IMessageDelivery)?.Message.Error ?? "unknown"; + ctx.OnError?.Invoke($"Thread creation failed: {err}"); + return response; + } + + var createdNode = cnr.Message.Node ?? threadNode; + var createdPath = createdNode.Path ?? fallbackPath; + ctx.OnThreadCreated?.Invoke(createdNode); + + var append = ctx.Hub.Post( + new AppendUserMessageRequest + { + ThreadPath = createdPath, + UserMessageId = Guid.NewGuid().ToString("N")[..8], + UserText = ctx.UserText, + AgentName = ctx.AgentName, + ModelName = ctx.ModelName, + ContextPath = ctx.ContextPath, + Attachments = ctx.Attachments + }, + o => o.WithTarget(new Address(createdPath))); + + if (append != null) + { + ctx.Hub.RegisterCallback((IMessageDelivery)append, appendResp => + { + if (appendResp is IMessageDelivery { Message.Success: false } fail) + ctx.OnError?.Invoke($"Append after thread create failed: {fail.Message.Error ?? "unknown"}"); + return appendResp; + }); + } + + return response; + }); + } /// /// Resubmits an existing user message: truncates Messages and IngestedMessageIds @@ -94,7 +189,37 @@ public static void CreateThreadAndSubmit(SubmitContext ctx) /// creates a new output cell. /// public static void Resubmit(ResubmitContext ctx) - => ThreadSubmissionClient.Resubmit(ctx); + { + if (string.IsNullOrEmpty(ctx.ThreadPath) || string.IsNullOrEmpty(ctx.UserMessageIdToReplay)) + { + ctx.OnError?.Invoke("Resubmit requires ThreadPath and UserMessageIdToReplay."); + return; + } + + var delivery = ctx.Hub.Post( + new ResubmitUserMessageRequest + { + ThreadPath = ctx.ThreadPath, + UserMessageId = ctx.UserMessageIdToReplay, + NewUserText = ctx.NewUserText, + AgentName = ctx.AgentName, + ModelName = ctx.ModelName + }, + o => o.WithTarget(new Address(ctx.ThreadPath))); + + if (delivery == null) + { + ctx.OnError?.Invoke("Hub.Post returned null"); + return; + } + + ctx.Hub.RegisterCallback((IMessageDelivery)delivery, response => + { + if (response is IMessageDelivery { Message.Success: false } fail) + ctx.OnError?.Invoke($"Resubmit failed: {fail.Message.Error ?? "unknown"}"); + return response; + }); + } // ═════════════════════════════════════════════════════════════════════ // Server-side API — invoked from thread hub initialization @@ -113,40 +238,36 @@ public static IDisposable InstallServerWatcher(IMessageHub threadHub) // ═════════════════════════════════════════════════════════════════════ /// - /// Thread-hub handler: registers a new user message id on the thread, stores Pending*, - /// and lets the watcher dispatch. Runs on the thread hub's scheduler — only one - /// AppendUserMessageRequest is processed at a time, so the state update is atomic - /// and patch-safe. + /// Thread-hub handler kept as a back-compat shim: re-routes legacy + /// through the new + /// path. New callers should write directly to the thread's MeshNode via ThreadInput + /// instead of posting this request. /// public static IMessageDelivery HandleAppendUserMessage( IMessageHub hub, IMessageDelivery delivery) { var req = delivery.Message; - hub.GetWorkspace().UpdateMeshNode(node => + try { - var t = node.Content as MeshThread ?? new MeshThread(); - var msgs = t.Messages.Contains(req.UserMessageId) ? t.Messages : t.Messages.Add(req.UserMessageId); - var userIds = t.UserMessageIds.Contains(req.UserMessageId) ? t.UserMessageIds : t.UserMessageIds.Add(req.UserMessageId); - // Accumulate queued text into PendingUserMessage. DispatchRound reads and clears it. - var pending = string.IsNullOrEmpty(t.PendingUserMessage) - ? req.UserText - : $"{t.PendingUserMessage}\n\n---\n\n{req.UserText}"; - return node with - { - Content = t with - { - Messages = msgs, - UserMessageIds = userIds, - PendingUserMessage = pending, - PendingAgentName = req.AgentName ?? t.PendingAgentName, - PendingModelName = req.ModelName ?? t.PendingModelName, - PendingContextPath = req.ContextPath ?? t.PendingContextPath, - PendingAttachments = req.Attachments?.ToImmutableList() ?? t.PendingAttachments - } - }; - }); - hub.Post(new AppendUserMessageResponse { Success = true }, o => o.ResponseFor(delivery)); + var msg = ThreadInput.CreateUserMessage( + req.UserText, + createdBy: delivery.AccessContext?.ObjectId, + authorName: null, + agentName: req.AgentName, + modelName: req.ModelName, + contextPath: req.ContextPath, + attachments: req.Attachments); + // Note: this shim ignores req.UserMessageId — the new flow allocates its own. + // Tests + the legacy client posted the id eagerly; the new flow only uses + // server-allocated ids so we don't honour the request's id here. + ThreadInput.AppendUserInput(hub.GetWorkspace(), req.ThreadPath, msg); + hub.Post(new AppendUserMessageResponse { Success = true }, o => o.ResponseFor(delivery)); + } + catch (Exception ex) + { + hub.Post(new AppendUserMessageResponse { Success = false, Error = ex.Message }, o => o.ResponseFor(delivery)); + } return delivery.Processed(); } @@ -340,235 +461,6 @@ public sealed record RoundDispatch( string? ContextPath, IReadOnlyList? Attachments); -/// -/// Client-side submission logic. All methods are void / fire-and-forget. -/// The client only posts CreateNodeRequest — the server watcher does all Thread-state bookkeeping -/// (append to Messages/UserMessageIds, set Pending*, dispatch). This avoids remote-stream write -/// races that produce out-of-bounds JSON patches. -/// -internal static class ThreadSubmissionClient -{ - private static string NewId() => Guid.NewGuid().ToString("N")[..8]; - - public static void Submit(SubmitContext ctx) - { - if (string.IsNullOrEmpty(ctx.ThreadPath)) - { - ctx.OnError?.Invoke("Submit requires ThreadPath. Use CreateThreadAndSubmit for new threads."); - return; - } - - var userMsgId = NewId(); - var threadAddr = new Address(ctx.ThreadPath); - var userCell = BuildUserCell(userMsgId, ctx.ThreadPath, ctx); - - void ReportFailure(string reason) - { - PostFailureRecord(ctx.Hub, ctx.ThreadPath!, userMsgId, ctx.UserText, reason); - ctx.OnError?.Invoke(reason); - } - - // 1) Create the user cell. - var createDelivery = ctx.Hub.Post( - new CreateNodeRequest(userCell), - o => o.WithTarget(threadAddr)); - - if (createDelivery != null) - { - ctx.Hub.RegisterCallback((IMessageDelivery)createDelivery, response => - { - if (response is IMessageDelivery { Message.Success: false } fail) - ReportFailure($"User cell creation failed: {fail.Message.Error ?? "unknown"}"); - return response; - }); - } - - // 2) Tell the thread hub to register the id and queue it. - var appendDelivery = ctx.Hub.Post( - new AppendUserMessageRequest - { - ThreadPath = ctx.ThreadPath, - UserMessageId = userMsgId, - UserText = ctx.UserText, - AgentName = ctx.AgentName, - ModelName = ctx.ModelName, - ContextPath = ctx.ContextPath, - Attachments = ctx.Attachments - }, - o => o.WithTarget(threadAddr)); - - if (appendDelivery != null) - { - ctx.Hub.RegisterCallback((IMessageDelivery)appendDelivery, response => - { - if (response is IMessageDelivery { Message.Success: false } fail) - ReportFailure($"Append failed: {fail.Message.Error ?? "unknown"}"); - return response; - }); - } - } - - public static void CreateThreadAndSubmit(SubmitContext ctx) - { - if (string.IsNullOrEmpty(ctx.Namespace)) - { - ctx.OnError?.Invoke("CreateThreadAndSubmit requires Namespace."); - return; - } - - var userMsgId = NewId(); - - // Build an empty thread node. The server watcher will populate Messages/UserMessageIds - // once the user cell is created. - var threadNode = ThreadNodeType.BuildThreadNode(ctx.Namespace, ctx.UserText, ctx.CreatedBy); - var threadPath = threadNode.Path!; - var userCell = BuildUserCell(userMsgId, threadPath, ctx); - - var delivery = ctx.Hub.Post( - new CreateNodeRequest(threadNode), - o => o.WithTarget(new Address(ctx.Namespace))); - - if (delivery == null) - { - ctx.OnError?.Invoke("Hub.Post returned null"); - return; - } - - ctx.Hub.RegisterCallback((IMessageDelivery)delivery, response => - { - if (response is not IMessageDelivery { Message.Success: true } cnr) - { - var err = (response as IMessageDelivery)?.Message.Error ?? "unknown"; - ctx.OnError?.Invoke($"Thread creation failed: {err}"); - return response; - } - - var createdNode = cnr.Message.Node ?? threadNode; - var createdPath = createdNode.Path ?? threadPath; - ctx.OnThreadCreated?.Invoke(createdNode); - - var threadAddr = new Address(createdPath); - - // Create the user cell on the new thread. - var cellDelivery = ctx.Hub.Post( - new CreateNodeRequest(userCell), - o => o.WithTarget(threadAddr)); - - if (cellDelivery is not null) - { - ctx.Hub.RegisterCallback((IMessageDelivery)cellDelivery, cellResp => - { - if (cellResp is IMessageDelivery { Message.Success: false } cellFail) - ctx.OnError?.Invoke($"User cell creation failed: {cellFail.Message.Error ?? "unknown"}"); - return cellResp; - }); - } - - // Tell the thread hub to register the id + queue it. - var appendDelivery = ctx.Hub.Post( - new AppendUserMessageRequest - { - ThreadPath = createdPath, - UserMessageId = userMsgId, - UserText = ctx.UserText, - AgentName = ctx.AgentName, - ModelName = ctx.ModelName, - ContextPath = ctx.ContextPath, - Attachments = ctx.Attachments - }, - o => o.WithTarget(threadAddr)); - - if (appendDelivery is not null) - { - ctx.Hub.RegisterCallback((IMessageDelivery)appendDelivery, appendResp => - { - if (appendResp is IMessageDelivery { Message.Success: false } fail) - ctx.OnError?.Invoke($"Append failed: {fail.Message.Error ?? "unknown"}"); - return appendResp; - }); - } - - return response; - }); - } - - public static void Resubmit(ResubmitContext ctx) - { - if (string.IsNullOrEmpty(ctx.ThreadPath) || string.IsNullOrEmpty(ctx.UserMessageIdToReplay)) - { - ctx.OnError?.Invoke("Resubmit requires ThreadPath and UserMessageIdToReplay."); - return; - } - - var delivery = ctx.Hub.Post( - new ResubmitUserMessageRequest - { - ThreadPath = ctx.ThreadPath, - UserMessageId = ctx.UserMessageIdToReplay, - NewUserText = ctx.NewUserText, - AgentName = ctx.AgentName, - ModelName = ctx.ModelName - }, - o => o.WithTarget(new Address(ctx.ThreadPath))); - - if (delivery == null) - { - ctx.OnError?.Invoke("Hub.Post returned null"); - return; - } - - ctx.Hub.RegisterCallback((IMessageDelivery)delivery, response => - { - if (response is IMessageDelivery { Message.Success: false } fail) - ctx.OnError?.Invoke($"Resubmit failed: {fail.Message.Error ?? "unknown"}"); - return response; - }); - } - - /// - /// Fire-and-forget post of a so the thread - /// shows the failure as an error response cell. If this post also fails, we've exhausted - /// recovery — swallow silently (the OnError callback is still invoked separately). - /// - private static void PostFailureRecord( - IMessageHub hub, string threadPath, string userMsgId, string userText, string error) - { - try - { - hub.Post( - new RecordSubmissionFailureRequest - { - ThreadPath = threadPath, - UserMessageId = userMsgId, - UserText = userText, - ErrorMessage = error - }, - o => o.WithTarget(new Address(threadPath))); - } - catch { /* swallow — caller's OnError will still fire */ } - } - - private static MeshNode BuildUserCell(string userMsgId, string threadPath, SubmitContext ctx) - => new(userMsgId, threadPath) - { - NodeType = ThreadMessageNodeType.NodeType, - MainNode = ctx.ContextPath ?? threadPath, - Content = new ThreadMessage - { - Role = "user", - AuthorName = ctx.AuthorName, - Text = ctx.UserText, - Timestamp = DateTime.UtcNow, - Type = ThreadMessageType.ExecutedInput, - CreatedBy = ctx.CreatedBy, - AgentName = ctx.AgentName, - ModelName = ctx.ModelName, - ContextPath = ctx.ContextPath, - Attachments = ctx.Attachments - } - }; -} - /// /// Server-side watcher: observes thread state changes and dispatches execution rounds. /// Installed once on thread hub initialization. Non-blocking; uses only Post + RegisterCallback @@ -583,34 +475,49 @@ public static IDisposable InstallServerWatcher(IMessageHub threadHub) var threadPath = threadHub.Address.Path; // Reentrancy guard: 0=idle, 1=dispatching. - // Combined with the thread's IsExecuting flag, prevents double-dispatch - // between "start dispatching" and "IsExecuting=true visible on stream". + // Held until IsExecuting=true is observed back through the same stream, so a + // re-emission triggered by our own response-cell write or PendingUserMessages + // patch can't double-dispatch. var dispatching = 0; - var sub = workspace.GetStream() - ?.Subscribe(nodes => + // Subscribe to this thread's own MeshNode (via MeshNodeReference) instead of the + // collection-wide stream — fewer wakeups, and the patches we observe are exactly + // the writes against this thread. + // + // Throttle by a small window so a burst of rapid AppendUserMessageRequest patches + // (user submits 3 messages in quick succession, or the GUI batches submits) coalesce + // into a SINGLE dispatch with all the queued user ids in one round / one response + // cell. Without throttling each patch individually wins the reentrancy guard and + // produces one round per submit. + var sub = workspace.GetStream(new MeshNodeReference()) + ?.Throttle(TimeSpan.FromMilliseconds(50)) + ?.Subscribe(change => { - if (nodes == null) return; + var threadNode = change.Value; + if (threadNode?.Content is not MeshThread thread) return; + + // IsExecuting=true is visible — we held the guard waiting for this commit. + if (thread.IsExecuting && dispatching == 1) + { + Interlocked.Exchange(ref dispatching, 0); + return; + } + if (thread.IsExecuting) return; + if (Interlocked.CompareExchange(ref dispatching, 1, 0) != 0) return; + var releaseGuard = true; try { - var threadNode = nodes.FirstOrDefault(n => n.Path == threadPath); - if (threadNode?.Content is not MeshThread thread) return; - - // Queue-don't-cancel: if the thread is executing, do nothing. The queued - // user messages stay in UserMessageIds; as soon as IsExecuting flips to - // false (current round completed naturally), we dispatch the next round. - // This matches Claude Code / Anthropic's recommended pattern — the Messages - // API doesn't support mid-stream injection and cancelling during a tool_use - // produces orphaned blocks that need synthetic tool_result recovery. - if (thread.IsExecuting) return; - var dispatch = ThreadSubmission.PlanNextRound(thread); if (dispatch is null) return; - DispatchRound(threadHub, threadNode, dispatch, logger); + // Hold the guard. It will be released when we observe IsExecuting=true + // back on this same stream above (or on hard failure inside DispatchRound). + releaseGuard = false; + DispatchRound(threadHub, threadNode, dispatch, logger, + onFailure: () => Interlocked.Exchange(ref dispatching, 0)); } catch (Exception ex) { @@ -618,7 +525,7 @@ public static IDisposable InstallServerWatcher(IMessageHub threadHub) } finally { - Interlocked.Exchange(ref dispatching, 0); + if (releaseGuard) Interlocked.Exchange(ref dispatching, 0); } }); @@ -629,12 +536,17 @@ public static IDisposable InstallServerWatcher(IMessageHub threadHub) /// Creates the output cell, writes the committed round to the thread node, and /// fires off agent execution on the _Exec hosted hub. Non-blocking — all /// Hub.Post + RegisterCallback; the workspace write is a synchronous fire-and-forget. + /// + /// Step 0 (new): for each unprocessed user id present in , + /// create the satellite ThreadMessage cell. The client only writes the thread node; + /// the server materializes the per-message satellite nodes here. /// private static void DispatchRound( IMessageHub hub, MeshNode threadNode, RoundDispatch dispatch, - ILogger? logger) + ILogger? logger, + Action? onFailure = null) { var threadPath = hub.Address.Path; var responseMsgId = dispatch.ResponseMessageId; @@ -642,10 +554,6 @@ private static void DispatchRound( var thread = threadNode.Content as MeshThread ?? new MeshThread(); var mainEntity = threadNode.MainNode ?? dispatch.ContextPath ?? threadPath; - // PendingUserMessage contains the concatenated text of all user messages queued by - // AppendUserMessageRequest handlers since the last dispatch. - var combinedUserText = thread.PendingUserMessage ?? ""; - var accessService = hub.ServiceProvider.GetService(); var userCtx = accessService?.Context ?? accessService?.CircuitContext; if (userCtx is null && !string.IsNullOrEmpty(thread.CreatedBy)) @@ -653,100 +561,180 @@ private static void DispatchRound( userCtx = new AccessContext { ObjectId = thread.CreatedBy, Name = thread.CreatedBy }; } - // Step 1: create the assistant output cell (CreateNodeRequest → RegisterCallback). - var responseCell = new MeshNode(responseMsgId, threadPath) - { - NodeType = ThreadMessageNodeType.NodeType, - MainNode = mainEntity, - Content = new ThreadMessage - { - Role = "assistant", - Text = "", - Timestamp = DateTime.UtcNow, - Type = ThreadMessageType.AgentResponse, - AgentName = dispatch.AgentName, - ModelName = dispatch.ModelName - } - }; + var meshService = hub.ServiceProvider.GetRequiredService(); - var createDelivery = hub.Post( - new CreateNodeRequest(responseCell), - o => userCtx != null ? o.WithAccessContext(userCtx).WithTarget(hub.Address) : o.WithTarget(hub.Address)); + // Step 0: materialize user satellite cells from PendingUserMessages. + // Only ids present in dispatch.UserMessageIds AND PendingUserMessages need creation + // here — legacy paths (PendingUserMessage string) create cells elsewhere. + var pendingForRound = dispatch.UserMessageIds + .Where(id => thread.PendingUserMessages.ContainsKey(id)) + .Select(id => (Id: id, Msg: thread.PendingUserMessages[id])) + .ToImmutableList(); - if (createDelivery == null) - { - logger?.LogWarning("[ThreadSubmission] Post of CreateNodeRequest returned null for response cell {ResponseMsgId} on {ThreadPath}", - responseMsgId, threadPath); - return; - } + var combinedUserText = pendingForRound.Count > 0 + ? string.Join("\n\n---\n\n", pendingForRound.Select(p => p.Msg.Text)) + : (thread.PendingUserMessage ?? ""); - hub.RegisterCallback((IMessageDelivery)createDelivery, response => + void AfterUserCellsReady() { - if (response is not IMessageDelivery { Message.Success: true }) + // Step 1: create the assistant output cell (CreateNodeRequest → RegisterCallback). + var responseCell = new MeshNode(responseMsgId, threadPath) { - var err = (response as IMessageDelivery)?.Message.Error ?? "unknown"; - logger?.LogWarning("[ThreadSubmission] Response cell creation failed for {ResponseMsgId} on {ThreadPath}: {Error}", - responseMsgId, threadPath, err); - return response; + NodeType = ThreadMessageNodeType.NodeType, + MainNode = mainEntity, + Content = new ThreadMessage + { + Role = "assistant", + Text = "", + Timestamp = DateTime.UtcNow, + Type = ThreadMessageType.AgentResponse, + AgentName = dispatch.AgentName, + ModelName = dispatch.ModelName + } + }; + + var createDelivery = hub.Post( + new CreateNodeRequest(responseCell), + o => userCtx != null + ? o.WithAccessContext(userCtx).WithTarget(hub.Address) + : o.WithTarget(hub.Address)); + + if (createDelivery == null) + { + logger?.LogWarning("[ThreadSubmission] Post of CreateNodeRequest returned null for response cell {ResponseMsgId} on {ThreadPath}", + responseMsgId, threadPath); + onFailure?.Invoke(); + return; } - // Step 2: commit the round to the thread state (one atomic UpdateMeshNode). - hub.GetWorkspace().UpdateMeshNode(node => + hub.RegisterCallback((IMessageDelivery)createDelivery, response => { - var t = node.Content as MeshThread ?? new MeshThread(); - var msgs = t.Messages.Contains(responseMsgId) ? t.Messages : t.Messages.Add(responseMsgId); - var ingested = t.IngestedMessageIds; - foreach (var uid in dispatch.UserMessageIds) + if (response is not IMessageDelivery { Message.Success: true }) { - if (!ingested.Contains(uid)) - ingested = ingested.Add(uid); + var err = (response as IMessageDelivery)?.Message.Error ?? "unknown"; + logger?.LogWarning("[ThreadSubmission] Response cell creation failed for {ResponseMsgId} on {ThreadPath}: {Error}", + responseMsgId, threadPath, err); + onFailure?.Invoke(); + return response; } - return node with + + // Step 2: commit the round to the thread state (one atomic UpdateMeshNode). + // Both the user satellite cells (created above in the materialization step) + // and the response satellite cell (just confirmed in the CreateNodeRequest + // callback above) exist on the hub now. Only NOW do we add their ids into + // Messages — the GUI iterates Messages to render LayoutAreaControls, so + // every id it sees has a backing satellite. + // + // The IsExecuting check is the idempotency guard — every other watcher + // emission in this round skips, so this body runs exactly once per round. + hub.GetWorkspace().UpdateMeshNode(node => { - Content = t with + var t = node.Content as MeshThread ?? new MeshThread(); + if (t.IsExecuting) return node; + + // User ids in dispatch order, then the response id last. + // Contains check covers the resubmit case where u1 was already in + // Messages from a prior round — ApplyResubmit removed u1 from + // IngestedMessageIds (so the watcher re-dispatches it) but kept it + // in Messages, so a blind AddRange would duplicate it. + var msgs = t.Messages; + foreach (var uid in dispatch.UserMessageIds) + if (!msgs.Contains(uid)) msgs = msgs.Add(uid); + msgs = msgs.Add(responseMsgId); + + var ingested = t.IngestedMessageIds.AddRange( + dispatch.UserMessageIds.Where(uid => !t.IngestedMessageIds.Contains(uid))); + + // Drop consumed PendingUserMessages entries — their satellites now exist + // and their ids are now in Messages. + var pending = t.PendingUserMessages; + foreach (var (uid, _) in pendingForRound) + pending = pending.Remove(uid); + + return node with { - Messages = msgs, - IngestedMessageIds = ingested, - IsExecuting = true, - ActiveMessageId = responseMsgId, - ExecutionStartedAt = DateTime.UtcNow, - TokensUsed = 0, - ExecutionStatus = null, - // Clear PendingUserMessage — the round's text is already captured in combinedUserText. - // Next AppendUserMessageRequest starts accumulating fresh for the next round. - PendingUserMessage = null, - PendingContextPath = dispatch.ContextPath, - PendingAttachments = dispatch.Attachments?.ToImmutableList() - } - }; - }); + Content = t with + { + Messages = msgs, + IngestedMessageIds = ingested, + IsExecuting = true, + ActiveMessageId = responseMsgId, + ExecutionStartedAt = DateTime.UtcNow, + TokensUsed = 0, + ExecutionStatus = null, + PendingUserMessage = null, + PendingUserMessages = pending, + PendingContextPath = dispatch.ContextPath, + PendingAttachments = dispatch.Attachments?.ToImmutableList() + } + }; + }); - hub.Post( - new UpdateThreadMessageContent { Text = "Allocating agent..." }, - o => o.WithTarget(new Address(responsePath))); + hub.Post( + new UpdateThreadMessageContent { Text = "Allocating agent..." }, + o => o.WithTarget(new Address(responsePath))); - // Step 3: post to _Exec hosted hub — actual agent streaming runs there. - var executionHub = hub.GetHostedHub( - new Address($"{hub.Address}/_Exec"), - config => config.WithHandler(ThreadExecution.ExecuteMessageAsync), - HostedHubCreation.Always); + // Step 3: post to _Exec hosted hub — actual agent streaming runs there. + var executionHub = hub.GetHostedHub( + new Address($"{hub.Address}/_Exec"), + config => config.WithHandler(ThreadExecution.ExecuteMessageAsync), + HostedHubCreation.Always); - executionHub!.Post( - new SubmitMessageRequest + executionHub!.Post( + new SubmitMessageRequest + { + ThreadPath = threadPath, + UserMessageText = combinedUserText, + UserMessageId = dispatch.UserMessageIds.LastOrDefault(), + ResponseMessageId = responseMsgId, + ResponsePath = responsePath, + AgentName = dispatch.AgentName, + ModelName = dispatch.ModelName, + ContextPath = dispatch.ContextPath, + Attachments = dispatch.Attachments + }, + o => userCtx != null ? o.WithAccessContext(userCtx) : o); + + return response; + }); + } + + if (pendingForRound.Count == 0) + { + AfterUserCellsReady(); + return; + } + + // Materialize satellite cells in parallel, then proceed. We swallow per-cell errors + // (cell may already exist from a prior crashed attempt — that's recoverable) and only + // wait for one notification per cell before continuing. + var creationStreams = pendingForRound.Select(p => + { + var cell = new MeshNode(p.Id, threadPath) + { + NodeType = ThreadMessageNodeType.NodeType, + MainNode = mainEntity, + Content = p.Msg + }; + return meshService.CreateNode(cell) + .Take(1) + .Select(_ => true) + .Catch(ex => { - ThreadPath = threadPath, - UserMessageText = combinedUserText, - UserMessageId = dispatch.UserMessageIds.LastOrDefault(), - ResponseMessageId = responseMsgId, - ResponsePath = responsePath, - AgentName = dispatch.AgentName, - ModelName = dispatch.ModelName, - ContextPath = dispatch.ContextPath, - Attachments = dispatch.Attachments - }, - o => userCtx != null ? o.WithAccessContext(userCtx) : o); + logger?.LogDebug(ex, "[ThreadSubmission] User cell create returned error (may already exist) for {Path}", + $"{threadPath}/{p.Id}"); + return Observable.Return(true); + }); + }).ToList(); - return response; - }); + Observable.CombineLatest(creationStreams) + .Take(1) + .Subscribe( + _ => AfterUserCellsReady(), + ex => + { + logger?.LogWarning(ex, "[ThreadSubmission] User cell materialization failed for {ThreadPath}", threadPath); + onFailure?.Invoke(); + }); } } diff --git a/src/MeshWeaver.AI/UpdateThreadMessageContent.cs b/src/MeshWeaver.AI/UpdateThreadMessageContent.cs index ead099905..f038f9536 100644 --- a/src/MeshWeaver.AI/UpdateThreadMessageContent.cs +++ b/src/MeshWeaver.AI/UpdateThreadMessageContent.cs @@ -9,9 +9,28 @@ namespace MeshWeaver.AI; /// public record UpdateThreadMessageContent { + /// + /// Incremental text chunk to APPEND to the current message Text. The preferred shape + /// for streaming — each chunk just carries the new bytes since the previous update. + /// + public string? TextDelta { get; init; } + + /// + /// Full text replacement. Only set for final-state writes (completion text, error text, + /// cancel markers). Streaming should use instead. + /// public string? Text { get; init; } + public ImmutableList? ToolCalls { get; init; } public ImmutableList? UpdatedNodes { get; init; } public string? AgentName { get; init; } public string? ModelName { get; init; } + + /// Token usage from the model provider. Set on the final update of a round. + public int? InputTokens { get; init; } + public int? OutputTokens { get; init; } + public int? TotalTokens { get; init; } + + /// Wall-clock completion timestamp. Set on the final update of a round. + public DateTime? CompletedAt { get; init; } } diff --git a/src/MeshWeaver.Blazor.AI/McpMeshPlugin.cs b/src/MeshWeaver.Blazor.AI/McpMeshPlugin.cs index 61a15c10c..5fcd3d332 100644 --- a/src/MeshWeaver.Blazor.AI/McpMeshPlugin.cs +++ b/src/MeshWeaver.Blazor.AI/McpMeshPlugin.cs @@ -69,6 +69,18 @@ public string NavigateTo( var resolvedPath = MeshOperations.ResolvePath(path); return $"{baseUrl}/node/{Uri.EscapeDataString(resolvedPath)}"; } + + [McpServerTool] + [Description("Returns compilation diagnostics for a NodeType (or any instance of one). Status is 'Ok' when the type compiled cleanly, 'Error' with details when it failed, 'Compiling' while a compile is in progress (with elapsedMs), or 'Unknown' when no compile has happened yet. Use after creating/updating a NodeType to verify it actually compiles — a NodeType that doesn't compile is not 'done'.")] + public Task GetDiagnostics( + [Description("Path to a NodeType (e.g., @Systemorph/SocialMedia/Profile) or to any instance of one")] string path) + => ops.GetDiagnostics(path); + + [McpServerTool] + [Description("Recycles the hub at the given path by posting DisposeRequest. Forces a fresh hub initialization on the next access — use after fixing a broken NodeType, after editing the `sources` list, or whenever a grain is stuck in a cached bad state. Returns {status:'Recycled', path}. Wait ~100ms before the next access so the grain teardown completes.")] + public Task Recycle( + [Description("Path to the node (e.g., @Systemorph/SocialMedia/Profile). Use the NodeType path to recycle the whole type; use an instance path to recycle just that instance's hub.")] string path) + => ops.Recycle(path); } /// diff --git a/src/MeshWeaver.Blazor.Portal/Chat/ThreadChatView.razor b/src/MeshWeaver.Blazor.Portal/Chat/ThreadChatView.razor index 5f75538b5..20b63200b 100644 --- a/src/MeshWeaver.Blazor.Portal/Chat/ThreadChatView.razor +++ b/src/MeshWeaver.Blazor.Portal/Chat/ThreadChatView.razor @@ -32,6 +32,13 @@ else @if (!ViewModel.HideEmptyState) {
+ @{ + var header = GetHeaderCell(); + } + @if (header != null) + { + + } @if (ThreadMessages.Count > 0) { @foreach (var msgId in ThreadMessages) @@ -94,6 +101,21 @@ else
} -@if (showSubmissionProgress) -{ -
-
- - Creating conversation... -
-
-}