diff --git a/forester/.gitignore b/forester/.gitignore index 5a66a2f170..71ed41a6c3 100644 --- a/forester/.gitignore +++ b/forester/.gitignore @@ -3,6 +3,7 @@ logs .idea .env .env.devnet +.env.mainnet *.json !package.json spawn.sh diff --git a/forester/Dockerfile b/forester/Dockerfile index 79b3f06f36..462de1ecb4 100644 --- a/forester/Dockerfile +++ b/forester/Dockerfile @@ -14,9 +14,8 @@ RUN cargo build --release --package forester FROM debian:bookworm-slim RUN apt-get update && apt-get install -y ca-certificates libssl3 && rm -rf /var/lib/apt/lists/* -RUN mkdir -p /app/config /app/static +RUN mkdir -p /app/config COPY --from=builder /app/target/release/forester /usr/local/bin/forester -COPY --from=builder /app/forester/static /app/static WORKDIR /app ENTRYPOINT ["/usr/local/bin/forester"] diff --git a/forester/dashboard/.env.local.example b/forester/dashboard/.env.local.example new file mode 100644 index 0000000000..9f0c9e833f --- /dev/null +++ b/forester/dashboard/.env.local.example @@ -0,0 +1 @@ +NEXT_PUBLIC_FORESTER_API_URL=http://localhost:8080 diff --git a/forester/dashboard/.gitignore b/forester/dashboard/.gitignore new file mode 100644 index 0000000000..ded1a3f3a0 --- /dev/null +++ b/forester/dashboard/.gitignore @@ -0,0 +1,4 @@ +node_modules/ +.next/ +out/ +.env.local diff --git a/forester/dashboard/Dockerfile b/forester/dashboard/Dockerfile new file mode 100644 index 0000000000..fe6ba0439e --- /dev/null +++ b/forester/dashboard/Dockerfile @@ -0,0 +1,26 @@ +FROM node:20-alpine AS base + +FROM base AS deps +WORKDIR /app +COPY package.json package-lock.json* ./ +RUN npm ci + +FROM base AS builder +WORKDIR /app +COPY --from=deps /app/node_modules ./node_modules +COPY . . +RUN npm run build + +FROM base AS runner +WORKDIR /app +ENV NODE_ENV=production +RUN addgroup --system --gid 1001 nodejs +RUN adduser --system --uid 1001 nextjs +COPY --from=builder /app/public ./public +COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./ +COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static + +USER nextjs +EXPOSE 3000 +ENV PORT=3000 +CMD ["node", "server.js"] diff --git a/forester/dashboard/next-env.d.ts b/forester/dashboard/next-env.d.ts new file mode 100644 index 0000000000..40c3d68096 --- /dev/null +++ b/forester/dashboard/next-env.d.ts @@ -0,0 +1,5 @@ +/// +/// + +// NOTE: This file should not be edited +// see https://nextjs.org/docs/app/building-your-application/configuring/typescript for more information. diff --git a/forester/dashboard/next.config.mjs b/forester/dashboard/next.config.mjs new file mode 100644 index 0000000000..8c8bab67b5 --- /dev/null +++ b/forester/dashboard/next.config.mjs @@ -0,0 +1,6 @@ +/** @type {import('next').NextConfig} */ +const nextConfig = { + output: "standalone", +}; + +export default nextConfig; diff --git a/forester/dashboard/package.json b/forester/dashboard/package.json new file mode 100644 index 0000000000..2c70cedc7c --- /dev/null +++ b/forester/dashboard/package.json @@ -0,0 +1,26 @@ +{ + "name": "forester-dashboard", + "version": "0.1.0", + "private": true, + "scripts": { + "dev": "next dev", + "build": "next build", + "start": "next start", + "lint": "next lint" + }, + "dependencies": { + "next": "^14.2.0", + "react": "^18.3.0", + "react-dom": "^18.3.0", + "swr": "^2.2.0" + }, + "devDependencies": { + "@types/node": "^20.0.0", + "@types/react": "^18.3.0", + "@types/react-dom": "^18.3.0", + "autoprefixer": "^10.4.0", + "postcss": "^8.4.0", + "tailwindcss": "^3.4.0", + "typescript": "^5.0.0" + } +} diff --git a/forester/dashboard/postcss.config.mjs b/forester/dashboard/postcss.config.mjs new file mode 100644 index 0000000000..2ef30fcf42 --- /dev/null +++ b/forester/dashboard/postcss.config.mjs @@ -0,0 +1,9 @@ +/** @type {import('postcss-load-config').Config} */ +const config = { + plugins: { + tailwindcss: {}, + autoprefixer: {}, + }, +}; + +export default config; diff --git a/forester/dashboard/src/app/compressible/page.tsx b/forester/dashboard/src/app/compressible/page.tsx new file mode 100644 index 0000000000..8ea4fc3274 --- /dev/null +++ b/forester/dashboard/src/app/compressible/page.tsx @@ -0,0 +1,30 @@ +"use client"; + +import { useCompressible } from "@/hooks/useCompressible"; +import { ErrorState } from "@/components/ErrorState"; +import { CompressiblePanel } from "@/components/CompressiblePanel"; + +export default function CompressiblePage() { + const { data, error, isLoading } = useCompressible(); + + if (isLoading) { + return ( +
+
+ Loading compressible status... +
+
+ ); + } + + if (error || !data) { + return ; + } + + return ( +
+

Compressible Accounts

+ +
+ ); +} diff --git a/forester/dashboard/src/app/globals.css b/forester/dashboard/src/app/globals.css new file mode 100644 index 0000000000..b5c61c9567 --- /dev/null +++ b/forester/dashboard/src/app/globals.css @@ -0,0 +1,3 @@ +@tailwind base; +@tailwind components; +@tailwind utilities; diff --git a/forester/dashboard/src/app/layout.tsx b/forester/dashboard/src/app/layout.tsx new file mode 100644 index 0000000000..9a3889aff9 --- /dev/null +++ b/forester/dashboard/src/app/layout.tsx @@ -0,0 +1,25 @@ +import type { Metadata } from "next"; +import { Sidebar } from "@/components/Sidebar"; +import "./globals.css"; + +export const metadata: Metadata = { + title: "Forester Dashboard", + description: "Light Protocol Forester monitoring dashboard", +}; + +export default function RootLayout({ + children, +}: { + children: React.ReactNode; +}) { + return ( + + +
+ +
{children}
+
+ + + ); +} diff --git a/forester/dashboard/src/app/metrics/page.tsx b/forester/dashboard/src/app/metrics/page.tsx new file mode 100644 index 0000000000..6d2ea64f14 --- /dev/null +++ b/forester/dashboard/src/app/metrics/page.tsx @@ -0,0 +1,44 @@ +"use client"; + +import { useMetrics } from "@/hooks/useMetrics"; +import { ErrorState } from "@/components/ErrorState"; +import { MetricsPanel } from "@/components/MetricsPanel"; + +export default function MetricsPage() { + const { data: metrics, error, isLoading } = useMetrics(); + + if (isLoading) { + return ( +
+
Loading metrics...
+
+ ); + } + + if (error || !metrics) { + return ; + } + + const isEmpty = + Object.keys(metrics.transactions_processed_total).length === 0 && + Object.keys(metrics.forester_balances).length === 0; + + return ( +
+

Metrics

+ {isEmpty ? ( +
+

+ No metrics data available. +

+

+ Configure --prometheus-url to query aggregated metrics, or connect + to a running forester instance. +

+
+ ) : ( + + )} +
+ ); +} diff --git a/forester/dashboard/src/app/page.tsx b/forester/dashboard/src/app/page.tsx new file mode 100644 index 0000000000..72ae9da8ae --- /dev/null +++ b/forester/dashboard/src/app/page.tsx @@ -0,0 +1,103 @@ +"use client"; + +import { useForesterStatus } from "@/hooks/useForesterStatus"; +import { EpochCard } from "@/components/EpochCard"; +import { ErrorState } from "@/components/ErrorState"; +import { ForesterList } from "@/components/ForesterList"; +import { QueuePressureChart } from "@/components/QueuePressureChart"; +import { formatNumber } from "@/lib/utils"; + +export default function OverviewPage() { + const { data: status, error, isLoading } = useForesterStatus(); + + if (isLoading) { + return ( +
+
Loading forester status...
+
+ ); + } + + if (error || !status) { + return ; + } + + const warnings: string[] = []; + if (status.active_epoch_foresters.length === 0) { + warnings.push("No foresters registered for the active epoch"); + } + if ( + status.slots_until_next_registration < 1000 && + status.registration_epoch_foresters.length === 0 + ) { + warnings.push("Registration closing soon with no foresters registered"); + } + + return ( +
+
+

Overview

+ + Slot {formatNumber(status.slot)} + +
+ + {warnings.map((w, i) => ( +
+ {w} +
+ ))} + +
+ + + + 0} + /> +
+ +
+ + +
+ +
+ + +
+
+ ); +} + +function StatCard({ + label, + value, + highlight, +}: { + label: string; + value: number; + highlight?: boolean; +}) { + return ( +
+
{label}
+
+ {formatNumber(value)} +
+
+ ); +} diff --git a/forester/dashboard/src/app/trees/page.tsx b/forester/dashboard/src/app/trees/page.tsx new file mode 100644 index 0000000000..2c21bbca75 --- /dev/null +++ b/forester/dashboard/src/app/trees/page.tsx @@ -0,0 +1,32 @@ +"use client"; + +import { useForesterStatus } from "@/hooks/useForesterStatus"; +import { ErrorState } from "@/components/ErrorState"; +import { TreeTable } from "@/components/TreeTable"; + +export default function TreesPage() { + const { data: status, error, isLoading } = useForesterStatus(); + + if (isLoading) { + return ( +
+
Loading trees...
+
+ ); + } + + if (error || !status) { + return ; + } + + return ( +
+

Trees

+ +
+ ); +} diff --git a/forester/dashboard/src/components/CompressiblePanel.tsx b/forester/dashboard/src/components/CompressiblePanel.tsx new file mode 100644 index 0000000000..b753e99c41 --- /dev/null +++ b/forester/dashboard/src/components/CompressiblePanel.tsx @@ -0,0 +1,59 @@ +import type { CompressibleResponse } from "@/types/forester"; +import { formatNumber } from "@/lib/utils"; + +interface CompressiblePanelProps { + data: CompressibleResponse; +} + +export function CompressiblePanel({ data }: CompressiblePanelProps) { + if (!data.enabled) { + return ( +
+

+ No compressible account data available. +

+

+ The dashboard will query on-chain data automatically. If this + persists, check the RPC connection. +

+
+ ); + } + + const cards = [ + { + label: "CToken Accounts", + value: data.ctoken_count, + desc: "Compressed token accounts tracked", + }, + { + label: "PDA Accounts", + value: data.pda_count, + desc: "Program-derived accounts tracked", + }, + { + label: "Mint Accounts", + value: data.mint_count, + desc: "Mint accounts tracked", + }, + ]; + + return ( +
+
+ {cards.map((card) => ( +
+
{card.label}
+
+ {card.value != null ? formatNumber(card.value) : "-"} +
+
{card.desc}
+
+ ))} +
+
+ ); +} diff --git a/forester/dashboard/src/components/EpochCard.tsx b/forester/dashboard/src/components/EpochCard.tsx new file mode 100644 index 0000000000..78d9e11022 --- /dev/null +++ b/forester/dashboard/src/components/EpochCard.tsx @@ -0,0 +1,60 @@ +import type { ForesterStatus } from "@/types/forester"; +import { ProgressBar } from "./ProgressBar"; +import { formatPercentage, slotsToTime } from "@/lib/utils"; + +interface EpochCardProps { + status: ForesterStatus; +} + +export function EpochCard({ status }: EpochCardProps) { + return ( +
+

+ Epoch Status +

+
+
+
+ Active Epoch {status.current_active_epoch} + + {formatPercentage(status.active_epoch_progress_percentage)} + +
+ +
+
+
+ Next Epoch +

{status.hours_until_next_epoch}h

+
+
+ Registration Epoch +

{status.current_registration_epoch}

+
+
+ Next Registration +

+ {slotsToTime(status.slots_until_next_registration)} +

+
+
+ Light Slot +

+ {status.current_light_slot != null + ? `${status.current_light_slot} / ${status.total_light_slots}` + : "N/A"} +

+
+
+ {status.current_light_slot != null && + status.slots_until_next_light_slot != null && ( +
+ Next light slot in{" "} + {slotsToTime(status.slots_until_next_light_slot)} ( + {status.slots_until_next_light_slot} slots) +
+ )} +
+
+ ); +} diff --git a/forester/dashboard/src/components/ErrorState.tsx b/forester/dashboard/src/components/ErrorState.tsx new file mode 100644 index 0000000000..17bca4c6f3 --- /dev/null +++ b/forester/dashboard/src/components/ErrorState.tsx @@ -0,0 +1,49 @@ +interface ErrorStateProps { + error: Error | undefined; + fallbackMessage?: string; +} + +export function ErrorState({ error, fallbackMessage }: ErrorStateProps) { + if (!error) { + return ( +
+
+

Error

+

+ {fallbackMessage || "An unexpected error occurred"} +

+
+
+ ); + } + + // ApiError sets name="ApiError" and message starts with "Forester API returned" + const isApiError = + error.name === "ApiError" || + error.message.startsWith("Forester API returned"); + + return ( +
+
+ {isApiError ? ( + <> +

API Error

+

{error.message}

+

+ The forester API server is reachable but returned an error. Check + that --rpc-url points to a valid Solana RPC with Light Protocol + deployed. +

+ + ) : ( + <> +

+ Connection Error +

+

{error.message}

+ + )} +
+
+ ); +} diff --git a/forester/dashboard/src/components/ForesterList.tsx b/forester/dashboard/src/components/ForesterList.tsx new file mode 100644 index 0000000000..b6a56736aa --- /dev/null +++ b/forester/dashboard/src/components/ForesterList.tsx @@ -0,0 +1,40 @@ +import type { ForesterInfo } from "@/types/forester"; +import { truncateAddress, formatSol } from "@/lib/utils"; + +interface ForesterListProps { + title: string; + foresters: ForesterInfo[]; +} + +export function ForesterList({ title, foresters }: ForesterListProps) { + return ( +
+

+ {title}{" "} + ({foresters.length}) +

+ {foresters.length === 0 ? ( +

No foresters registered

+ ) : ( +
+ {foresters.map((f, i) => { + const low = f.balance_sol != null && f.balance_sol < 0.1; + return ( +
+ + {truncateAddress(f.authority, 6)} + + + {formatSol(f.balance_sol)} + +
+ ); + })} +
+ )} +
+ ); +} diff --git a/forester/dashboard/src/components/MetricsPanel.tsx b/forester/dashboard/src/components/MetricsPanel.tsx new file mode 100644 index 0000000000..7ce372b42c --- /dev/null +++ b/forester/dashboard/src/components/MetricsPanel.tsx @@ -0,0 +1,125 @@ +import type { MetricsResponse } from "@/types/forester"; +import { formatNumber, formatSol } from "@/lib/utils"; + +interface MetricsPanelProps { + metrics: MetricsResponse; +} + +export function MetricsPanel({ metrics }: MetricsPanelProps) { + const totalTx = Object.values(metrics.transactions_processed_total).reduce( + (a, b) => a + b, + 0 + ); + const rates = Object.entries(metrics.transaction_rate); + const balances = Object.entries(metrics.forester_balances); + const queues = Object.entries(metrics.queue_lengths); + + return ( +
+
+ + 0 + ? new Date(metrics.last_run_timestamp * 1000).toLocaleString() + : "N/A" + } + /> + +
+ + {rates.length > 0 && ( +
+
+ {rates.map(([epoch, rate]) => ( +
+
Epoch {epoch}
+
+ {rate.toFixed(2)} tx/s +
+
+ {formatNumber( + metrics.transactions_processed_total[epoch] ?? 0 + )}{" "} + total +
+
+ ))} +
+
+ )} + + {balances.length > 0 && ( +
+
+ {balances.map(([pubkey, balance]) => ( +
+ + {pubkey.slice(0, 8)}...{pubkey.slice(-4)} + + + {formatSol(balance)} + +
+ ))} +
+
+ )} + + {queues.length > 0 && ( +
+
+ {queues.map(([tree, length]) => ( +
+ + {tree.slice(0, 8)}...{tree.slice(-4)} + + + {formatNumber(length)} + +
+ ))} +
+
+ )} +
+ ); +} + +function StatCard({ label, value }: { label: string; value: string }) { + return ( +
+
{label}
+
{value}
+
+ ); +} + +function Section({ + title, + children, +}: { + title: string; + children: React.ReactNode; +}) { + return ( +
+

{title}

+ {children} +
+ ); +} diff --git a/forester/dashboard/src/components/ProgressBar.tsx b/forester/dashboard/src/components/ProgressBar.tsx new file mode 100644 index 0000000000..c4ec9018ee --- /dev/null +++ b/forester/dashboard/src/components/ProgressBar.tsx @@ -0,0 +1,28 @@ +interface ProgressBarProps { + value: number; + max?: number; + className?: string; + barColor?: string; +} + +export function ProgressBar({ + value, + max = 100, + className = "", + barColor, +}: ProgressBarProps) { + const pct = max > 0 ? Math.min((value / max) * 100, 100) : 0; + const color = + barColor ?? (pct > 90 ? "bg-red-500" : pct > 70 ? "bg-amber-500" : "bg-blue-500"); + + return ( +
+
+
+ ); +} diff --git a/forester/dashboard/src/components/QueuePressureChart.tsx b/forester/dashboard/src/components/QueuePressureChart.tsx new file mode 100644 index 0000000000..5b6d15eefa --- /dev/null +++ b/forester/dashboard/src/components/QueuePressureChart.tsx @@ -0,0 +1,73 @@ +import type { AggregateQueueStats } from "@/types/forester"; +import { formatNumber } from "@/lib/utils"; + +interface QueuePressureChartProps { + stats: AggregateQueueStats; +} + +const entries: { + key: keyof AggregateQueueStats; + label: string; + color: string; +}[] = [ + { + key: "state_v1_total_pending", + label: "State V1", + color: "bg-purple-500", + }, + { + key: "state_v2_input_pending", + label: "State V2 Input", + color: "bg-indigo-500", + }, + { + key: "state_v2_output_pending", + label: "State V2 Output", + color: "bg-indigo-300", + }, + { + key: "address_v1_total_pending", + label: "Addr V1", + color: "bg-teal-500", + }, + { + key: "address_v2_input_pending", + label: "Addr V2 Input", + color: "bg-cyan-500", + }, +]; + +export function QueuePressureChart({ stats }: QueuePressureChartProps) { + const maxVal = Math.max( + ...entries.map((e) => stats[e.key]), + 1 + ); + + return ( +
+

+ Queue Pressure +

+
+ {entries.map((e) => { + const val = stats[e.key]; + const pct = (val / maxVal) * 100; + return ( +
+
+ {e.label} + {formatNumber(val)} +
+
+
+
+
+ ); + })} +
+
+ ); +} diff --git a/forester/dashboard/src/components/Sidebar.tsx b/forester/dashboard/src/components/Sidebar.tsx new file mode 100644 index 0000000000..b4290b8c93 --- /dev/null +++ b/forester/dashboard/src/components/Sidebar.tsx @@ -0,0 +1,42 @@ +"use client"; + +import Link from "next/link"; +import { usePathname } from "next/navigation"; + +const navItems = [ + { href: "/", label: "Overview" }, + { href: "/trees", label: "Trees" }, + { href: "/metrics", label: "Metrics" }, + { href: "/compressible", label: "Compressible" }, +]; + +export function Sidebar() { + const pathname = usePathname(); + + return ( + + ); +} diff --git a/forester/dashboard/src/components/StatusBadge.tsx b/forester/dashboard/src/components/StatusBadge.tsx new file mode 100644 index 0000000000..261c2b29e5 --- /dev/null +++ b/forester/dashboard/src/components/StatusBadge.tsx @@ -0,0 +1,17 @@ +interface StatusBadgeProps { + label: string; + color?: string; +} + +export function StatusBadge({ + label, + color = "bg-gray-100 text-gray-800", +}: StatusBadgeProps) { + return ( + + {label} + + ); +} diff --git a/forester/dashboard/src/components/TreeBatchDetail.tsx b/forester/dashboard/src/components/TreeBatchDetail.tsx new file mode 100644 index 0000000000..99114078d1 --- /dev/null +++ b/forester/dashboard/src/components/TreeBatchDetail.tsx @@ -0,0 +1,88 @@ +import type { V2QueueInfo } from "@/types/forester"; +import { StatusBadge } from "./StatusBadge"; +import { ProgressBar } from "./ProgressBar"; +import { + formatNumber, + batchStateLabel, + batchStateColor, +} from "@/lib/utils"; + +interface TreeBatchDetailProps { + info: V2QueueInfo; +} + +export function TreeBatchDetail({ info }: TreeBatchDetailProps) { + return ( +
+
+
+ ZKP Batch Size +

{formatNumber(info.zkp_batch_size)}

+
+
+ Input Pending Batches +

{info.input_pending_batches}

+
+
+ Output Pending Batches +

{info.output_pending_batches}

+
+
+ + {info.batches.length > 0 && ( + + + + + + + + + + + + + {info.batches.map((batch) => ( + + + + + + + + + ))} + +
BatchStateInsertedIndexPendingZKP Fill
+ #{batch.batch_index} + + + + {formatNumber(batch.num_inserted)} + + {formatNumber(batch.current_index)} + + {formatNumber(batch.pending)} + +
+ + + {batch.items_in_current_zkp_batch}/{info.zkp_batch_size} + +
+
+ )} +
+ ); +} diff --git a/forester/dashboard/src/components/TreeTable.tsx b/forester/dashboard/src/components/TreeTable.tsx new file mode 100644 index 0000000000..c49d365c99 --- /dev/null +++ b/forester/dashboard/src/components/TreeTable.tsx @@ -0,0 +1,243 @@ +"use client"; + +import { useState, useMemo } from "react"; +import type { TreeStatus, ForesterInfo } from "@/types/forester"; +import { StatusBadge } from "./StatusBadge"; +import { ProgressBar } from "./ProgressBar"; +import { TreeBatchDetail } from "./TreeBatchDetail"; +import { + truncateAddress, + formatNumber, + formatPercentage, + treeTypeColor, +} from "@/lib/utils"; + +interface TreeTableProps { + trees: TreeStatus[]; + foresters: ForesterInfo[]; + currentLightSlot: number | null; +} + +type SortKey = "type" | "fullness" | "pending"; +type FilterType = "all" | "StateV1" | "StateV2" | "AddressV1" | "AddressV2"; + +export function TreeTable({ + trees, + foresters, + currentLightSlot, +}: TreeTableProps) { + const [filter, setFilter] = useState("all"); + const [sortBy, setSortBy] = useState("pending"); + const [expanded, setExpanded] = useState(null); + const [showRolledOver, setShowRolledOver] = useState(false); + + const filtered = useMemo(() => { + let result = trees; + if (!showRolledOver) { + result = result.filter((t) => !t.is_rolledover); + } + if (filter !== "all") { + result = result.filter((t) => t.tree_type === filter); + } + return [...result].sort((a, b) => { + switch (sortBy) { + case "type": + return a.tree_type.localeCompare(b.tree_type); + case "fullness": + return b.fullness_percentage - a.fullness_percentage; + case "pending": + return (b.queue_length ?? 0) - (a.queue_length ?? 0); + default: + return 0; + } + }); + }, [trees, filter, sortBy, showRolledOver]); + + const filters: FilterType[] = [ + "all", + "StateV1", + "StateV2", + "AddressV1", + "AddressV2", + ]; + + return ( +
+
+ {filters.map((f) => ( + + ))} +
+ + +
+
+ +
+ + + + + + + + + + + + + + {filtered.map((tree) => { + const isExpanded = expanded === tree.merkle_tree; + return ( + + + setExpanded(isExpanded ? null : tree.merkle_tree) + } + > + + + + + + + + + {isExpanded && tree.v2_queue_info && ( + + + + )} + + ); + })} + +
TypeAddressFullnessIndex / CapPendingForesterSchedule
+ + + {truncateAddress(tree.merkle_tree, 6)} + +
+ + + {formatPercentage(tree.fullness_percentage)} + +
+
+ {formatNumber(tree.next_index)} /{" "} + {formatNumber(tree.capacity)} + + {tree.v2_queue_info ? ( + + I:{tree.v2_queue_info.input_pending_batches * + tree.v2_queue_info.zkp_batch_size}{" "} + {tree.tree_type === "StateV2" && ( + <> + O: + {tree.v2_queue_info.output_pending_batches * + tree.v2_queue_info.zkp_batch_size} + + )} + + ) : ( + + {tree.queue_length != null + ? formatNumber(tree.queue_length) + : "-"} + + )} + + {tree.assigned_forester + ? truncateAddress(tree.assigned_forester, 4) + : "-"} + + +
+ +
+ {filtered.length === 0 && ( +

+ No trees match the current filter +

+ )} +
+
+ ); +} + +import { Fragment } from "react"; + +function ScheduleGrid({ + schedule, + currentSlot, +}: { + schedule: (number | null)[]; + currentSlot: number | null; +}) { + if (schedule.length === 0) + return -; + + // Show a compact view: only around the current slot + const start = currentSlot != null ? Math.max(0, currentSlot - 2) : 0; + const visible = schedule.slice(start, start + 8); + + return ( +
+ {visible.map((slot, i) => { + const idx = start + i; + const isCurrent = idx === currentSlot; + return ( +
+ ); + })} + {schedule.length > 8 && ( + +{schedule.length - 8} + )} +
+ ); +} diff --git a/forester/dashboard/src/hooks/useCompressible.ts b/forester/dashboard/src/hooks/useCompressible.ts new file mode 100644 index 0000000000..1158185f79 --- /dev/null +++ b/forester/dashboard/src/hooks/useCompressible.ts @@ -0,0 +1,10 @@ +import useSWR from "swr"; +import { fetcher } from "@/lib/api"; +import type { CompressibleResponse } from "@/types/forester"; + +export function useCompressible() { + return useSWR("/compressible", fetcher, { + refreshInterval: 15000, + revalidateOnFocus: true, + }); +} diff --git a/forester/dashboard/src/hooks/useForesterStatus.ts b/forester/dashboard/src/hooks/useForesterStatus.ts new file mode 100644 index 0000000000..eb58f5b9c2 --- /dev/null +++ b/forester/dashboard/src/hooks/useForesterStatus.ts @@ -0,0 +1,16 @@ +import useSWR from "swr"; +import { fetcher } from "@/lib/api"; +import type { ForesterStatus } from "@/types/forester"; + +export function useForesterStatus() { + return useSWR("/status", fetcher, { + refreshInterval: (data) => { + if (data?.slots_until_next_light_slot) { + const ms = data.slots_until_next_light_slot * 0.46 * 1000 + 500; + return Math.max(2000, Math.min(ms, 10000)); + } + return 10000; + }, + revalidateOnFocus: true, + }); +} diff --git a/forester/dashboard/src/hooks/useMetrics.ts b/forester/dashboard/src/hooks/useMetrics.ts new file mode 100644 index 0000000000..481be5de44 --- /dev/null +++ b/forester/dashboard/src/hooks/useMetrics.ts @@ -0,0 +1,10 @@ +import useSWR from "swr"; +import { fetcher } from "@/lib/api"; +import type { MetricsResponse } from "@/types/forester"; + +export function useMetrics() { + return useSWR("/metrics/json", fetcher, { + refreshInterval: 10000, + revalidateOnFocus: true, + }); +} diff --git a/forester/dashboard/src/lib/api.ts b/forester/dashboard/src/lib/api.ts new file mode 100644 index 0000000000..186359c71f --- /dev/null +++ b/forester/dashboard/src/lib/api.ts @@ -0,0 +1,42 @@ +const API_URL = + process.env.NEXT_PUBLIC_FORESTER_API_URL || "http://localhost:8080"; + +export class ApiError extends Error { + constructor( + message: string, + public status: number, + public serverMessage: string + ) { + super(message); + this.name = "ApiError"; + } +} + +export async function fetchApi(path: string): Promise { + let res: Response; + try { + res = await fetch(`${API_URL}${path}`); + } catch (e) { + // Network error — server not reachable + throw new Error( + `Cannot connect to forester API at ${API_URL}. Make sure the API server is running.` + ); + } + if (!res.ok) { + let serverMsg = ""; + try { + const body = await res.json(); + serverMsg = body.error || JSON.stringify(body); + } catch { + serverMsg = await res.text().catch(() => "Unknown error"); + } + throw new ApiError( + `Forester API returned ${res.status}: ${serverMsg}`, + res.status, + serverMsg + ); + } + return res.json(); +} + +export const fetcher = (path: string) => fetchApi(path); diff --git a/forester/dashboard/src/lib/utils.ts b/forester/dashboard/src/lib/utils.ts new file mode 100644 index 0000000000..0249948133 --- /dev/null +++ b/forester/dashboard/src/lib/utils.ts @@ -0,0 +1,67 @@ +export function truncateAddress(addr: string, chars = 4): string { + if (addr.length <= chars * 2 + 3) return addr; + return `${addr.slice(0, chars)}...${addr.slice(-chars)}`; +} + +export function formatSol(lamports: number | null | undefined): string { + if (lamports == null) return "-"; + return `${lamports.toFixed(4)} SOL`; +} + +export function formatNumber(n: number): string { + return n.toLocaleString(); +} + +export function formatPercentage(n: number, decimals = 2): string { + return `${n.toFixed(decimals)}%`; +} + +export function slotsToTime(slots: number): string { + const seconds = Math.round(slots * 0.46); + if (seconds < 60) return `${seconds}s`; + if (seconds < 3600) return `${Math.round(seconds / 60)}m`; + const hours = Math.floor(seconds / 3600); + const mins = Math.round((seconds % 3600) / 60); + return mins > 0 ? `${hours}h ${mins}m` : `${hours}h`; +} + +export function batchStateLabel(state: number): string { + switch (state) { + case 0: + return "Fill"; + case 1: + return "Inserted"; + case 2: + return "Full"; + default: + return `Unknown(${state})`; + } +} + +export function batchStateColor(state: number): string { + switch (state) { + case 0: + return "bg-blue-100 text-blue-800"; + case 1: + return "bg-green-100 text-green-800"; + case 2: + return "bg-amber-100 text-amber-800"; + default: + return "bg-gray-100 text-gray-800"; + } +} + +export function treeTypeColor(type: string): string { + switch (type) { + case "StateV1": + return "bg-purple-100 text-purple-800"; + case "StateV2": + return "bg-indigo-100 text-indigo-800"; + case "AddressV1": + return "bg-teal-100 text-teal-800"; + case "AddressV2": + return "bg-cyan-100 text-cyan-800"; + default: + return "bg-gray-100 text-gray-800"; + } +} diff --git a/forester/dashboard/src/types/forester.ts b/forester/dashboard/src/types/forester.ts new file mode 100644 index 0000000000..c9f047dfe4 --- /dev/null +++ b/forester/dashboard/src/types/forester.ts @@ -0,0 +1,88 @@ +export interface ForesterInfo { + authority: string; + balance_sol: number | null; +} + +export interface BatchInfo { + batch_index: number; + batch_state: number; + num_inserted: number; + current_index: number; + pending: number; + items_in_current_zkp_batch: number; +} + +export interface V2QueueInfo { + next_index: number; + pending_batch_index: number; + zkp_batch_size: number; + batches: BatchInfo[]; + input_pending_batches: number; + output_pending_batches: number; + input_items_in_current_zkp_batch: number; + output_items_in_current_zkp_batch: number; +} + +export interface TreeStatus { + tree_type: string; + merkle_tree: string; + queue: string; + fullness_percentage: number; + next_index: number; + capacity: number; + height: number; + threshold: number; + is_rolledover: boolean; + queue_length: number | null; + v2_queue_info: V2QueueInfo | null; + assigned_forester: string | null; + schedule: (number | null)[]; + owner: string; +} + +export interface AggregateQueueStats { + state_v1_total_pending: number; + state_v2_input_pending: number; + state_v2_output_pending: number; + address_v1_total_pending: number; + address_v2_input_pending: number; +} + +export interface ForesterStatus { + slot: number; + current_active_epoch: number; + current_registration_epoch: number; + active_epoch_progress: number; + active_phase_length: number; + active_epoch_progress_percentage: number; + hours_until_next_epoch: number; + slots_until_next_registration: number; + hours_until_next_registration: number; + active_epoch_foresters: ForesterInfo[]; + registration_epoch_foresters: ForesterInfo[]; + trees: TreeStatus[]; + current_light_slot: number | null; + light_slot_length: number; + slots_until_next_light_slot: number | null; + total_light_slots: number; + total_trees: number; + active_trees: number; + rolled_over_trees: number; + total_pending_items: number; + aggregate_queue_stats: AggregateQueueStats; +} + +export interface MetricsResponse { + transactions_processed_total: Record; + transaction_rate: Record; + last_run_timestamp: number; + forester_balances: Record; + queue_lengths: Record; +} + +export interface CompressibleResponse { + enabled: boolean; + ctoken_count?: number; + pda_count?: number; + mint_count?: number; +} diff --git a/forester/dashboard/tailwind.config.ts b/forester/dashboard/tailwind.config.ts new file mode 100644 index 0000000000..8b799fd172 --- /dev/null +++ b/forester/dashboard/tailwind.config.ts @@ -0,0 +1,11 @@ +import type { Config } from "tailwindcss"; + +const config: Config = { + content: ["./src/**/*.{js,ts,jsx,tsx,mdx}"], + theme: { + extend: {}, + }, + plugins: [], +}; + +export default config; diff --git a/forester/src/api_server.rs b/forester/src/api_server.rs index 22aeeaceb6..e83b0f04e4 100644 --- a/forester/src/api_server.rs +++ b/forester/src/api_server.rs @@ -1,11 +1,15 @@ -use std::{collections::HashMap, net::SocketAddr, thread::JoinHandle, time::Duration}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, thread::JoinHandle, time::Duration}; use serde::{Deserialize, Serialize}; -use tokio::sync::oneshot; +use tokio::sync::{broadcast, oneshot, watch}; use tracing::{error, info, warn}; -use warp::{http::StatusCode, Filter}; +use warp::Filter; -use crate::{forester_status::get_forester_status, metrics::REGISTRY}; +use crate::{ + compressible::{CTokenAccountTracker, MintAccountTracker, PdaAccountTracker}, + forester_status::get_forester_status, + metrics::REGISTRY, +}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HealthResponse { @@ -17,7 +21,7 @@ pub struct ErrorResponse { pub error: String, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct MetricsResponse { pub transactions_processed_total: HashMap, pub transaction_rate: HashMap, @@ -26,11 +30,90 @@ pub struct MetricsResponse { pub queue_lengths: HashMap, } -const DASHBOARD_HTML: &str = include_str!("../static/dashboard.html"); +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressibleResponse { + pub enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub ctoken_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub pda_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub mint_count: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MetricsSnapshot { + #[serde(flatten)] + pub data: MetricsResponse, + pub source: String, + pub cached_at: i64, +} + +impl MetricsSnapshot { + fn empty() -> Self { + Self { + data: MetricsResponse::default(), + source: "none".to_string(), + cached_at: 0, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressibleSnapshot { + #[serde(flatten)] + pub data: CompressibleResponse, + pub source: String, + pub cached_at: i64, +} + +impl CompressibleSnapshot { + fn empty() -> Self { + Self { + data: CompressibleResponse { + enabled: false, + ctoken_count: None, + pda_count: None, + mint_count: None, + }, + source: "none".to_string(), + cached_at: 0, + } + } +} + +#[derive(Clone)] +pub(crate) struct CompressibleTrackers { + pub ctoken: Option>, + pub pda: Option>, + pub mint: Option>, +} + +/// Holds optional references to compressible trackers for the API server. +pub struct CompressibleDashboardState { + pub ctoken_tracker: Option>, + pub pda_tracker: Option>, + pub mint_tracker: Option>, +} + +/// Configuration for the HTTP API server. +pub struct ApiServerConfig { + pub rpc_url: String, + pub port: u16, + pub allow_public_bind: bool, + pub compressible_state: Option, + pub prometheus_url: Option, +} /// Default timeout for status endpoint in seconds const STATUS_TIMEOUT_SECS: u64 = 30; +/// Timeout for external HTTP requests (Prometheus queries) +const EXTERNAL_HTTP_TIMEOUT: Duration = Duration::from_secs(15); + +/// Overall timeout for on-chain compressible count fetch (paginated, can be slow) +const COMPRESSIBLE_FETCH_TIMEOUT: Duration = Duration::from_secs(90); + /// Handle returned by spawn_api_server for graceful shutdown pub struct ApiServerHandle { /// Thread handle for the API server @@ -51,16 +134,190 @@ impl ApiServerHandle { } } +// --------------------------------------------------------------------------- +// Extracted fetch functions (testable, no warp / no Mutex / no cache) +// --------------------------------------------------------------------------- + +/// Fetch metrics: try local REGISTRY first, then Prometheus. +pub(crate) async fn fetch_metrics_snapshot( + client: &reqwest::Client, + prometheus_url: &Option, +) -> MetricsSnapshot { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + + // First try local REGISTRY (co-located forester mode) + if let Ok(metrics) = get_metrics_json() { + if !is_metrics_empty(&metrics) { + return MetricsSnapshot { + data: metrics, + source: "registry".to_string(), + cached_at: now, + }; + } + } + + // Prometheus fallback + if let Some(ref url) = prometheus_url { + match crate::metrics::query_prometheus_metrics(client, url).await { + Ok(metrics) => { + return MetricsSnapshot { + data: metrics, + source: "prometheus".to_string(), + cached_at: now, + }; + } + Err(e) => { + warn!("Prometheus query failed: {}", e); + } + } + } + + // No data from any source + MetricsSnapshot { + data: MetricsResponse::default(), + source: "none".to_string(), + cached_at: now, + } +} + +/// Fetch compressible counts: try in-memory trackers first, then RPC. +pub(crate) async fn fetch_compressible_snapshot( + trackers: &Option, + rpc_url: &str, +) -> CompressibleSnapshot { + use crate::compressible::traits::CompressibleTracker; + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + + if let Some(ref t) = trackers { + return CompressibleSnapshot { + data: CompressibleResponse { + enabled: true, + ctoken_count: t.ctoken.as_ref().map(|tr| tr.len()), + pda_count: t.pda.as_ref().map(|tr| tr.len()), + mint_count: t.mint.as_ref().map(|tr| tr.len()), + }, + source: "tracker".to_string(), + cached_at: now, + }; + } + + // Standalone mode: RPC with timeout + let fetch_result = tokio::time::timeout( + COMPRESSIBLE_FETCH_TIMEOUT, + crate::compressible::count_compressible_accounts(rpc_url), + ) + .await; + + match fetch_result { + Ok(Ok((ctoken_count, mint_count))) => CompressibleSnapshot { + data: CompressibleResponse { + enabled: true, + ctoken_count: Some(ctoken_count), + pda_count: None, + mint_count: Some(mint_count), + }, + source: "rpc".to_string(), + cached_at: now, + }, + Ok(Err(e)) => { + warn!("RPC compressible count failed: {}", e); + CompressibleSnapshot { + data: CompressibleResponse { + enabled: false, + ctoken_count: None, + pda_count: None, + mint_count: None, + }, + source: "none".to_string(), + cached_at: now, + } + } + Err(_) => { + warn!( + "Compressible count timed out after {}s", + COMPRESSIBLE_FETCH_TIMEOUT.as_secs() + ); + CompressibleSnapshot { + data: CompressibleResponse { + enabled: false, + ctoken_count: None, + pda_count: None, + mint_count: None, + }, + source: "none".to_string(), + cached_at: now, + } + } + } +} + +// --------------------------------------------------------------------------- +// Background provider tasks +// --------------------------------------------------------------------------- + +/// Periodically fetches metrics and publishes via watch channel. +async fn run_metrics_provider( + tx: watch::Sender, + client: reqwest::Client, + prometheus_url: Option, + mut shutdown: broadcast::Receiver<()>, +) { + loop { + let snapshot = fetch_metrics_snapshot(&client, &prometheus_url).await; + if tx.send(snapshot).is_err() { + break; // all receivers dropped + } + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => {} + _ = shutdown.recv() => break, + } + } + info!("Metrics provider stopped"); +} + +/// Periodically fetches compressible counts and publishes via watch channel. +async fn run_compressible_provider( + tx: watch::Sender, + trackers: Option, + rpc_url: String, + mut shutdown: broadcast::Receiver<()>, +) { + // In-memory trackers are cheap (.len()); RPC is expensive (getProgramAccounts) + let interval = if trackers.is_some() { + Duration::from_secs(5) + } else { + Duration::from_secs(30) + }; + + loop { + let snapshot = fetch_compressible_snapshot(&trackers, &rpc_url).await; + if tx.send(snapshot).is_err() { + break; + } + tokio::select! { + _ = tokio::time::sleep(interval) => {} + _ = shutdown.recv() => break, + } + } + info!("Compressible provider stopped"); +} + +// --------------------------------------------------------------------------- +// Server entry point +// --------------------------------------------------------------------------- + /// Spawn the HTTP API server with graceful shutdown support. /// -/// # Arguments -/// * `rpc_url` - RPC URL for forester status endpoint -/// * `port` - Port to bind to -/// * `allow_public_bind` - If true, binds to 0.0.0.0; if false, binds to 127.0.0.1 -/// /// # Returns /// An `ApiServerHandle` that can be used to trigger graceful shutdown -pub fn spawn_api_server(rpc_url: String, port: u16, allow_public_bind: bool) -> ApiServerHandle { +pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle { let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let thread_handle = std::thread::spawn(move || { @@ -72,20 +329,59 @@ pub fn spawn_api_server(rpc_url: String, port: u16, allow_public_bind: bool) -> } }; rt.block_on(async move { - let addr = if allow_public_bind { + let addr = if config.allow_public_bind { warn!( - "API server binding to 0.0.0.0:{} - endpoints /status and /metrics/json will be publicly accessible", - port + "API server binding to 0.0.0.0:{} - endpoints will be publicly accessible", + config.port ); - SocketAddr::from(([0, 0, 0, 0], port)) + SocketAddr::from(([0, 0, 0, 0], config.port)) } else { - SocketAddr::from(([127, 0, 0, 1], port)) + SocketAddr::from(([127, 0, 0, 1], config.port)) }; info!("Starting HTTP API server on {}", addr); - let dashboard_route = warp::path::end() - .and(warp::get()) - .map(|| warp::reply::html(DASHBOARD_HTML)); + // Shared HTTP client with timeout for external requests (Prometheus) + let http_client = reqwest::Client::builder() + .timeout(EXTERNAL_HTTP_TIMEOUT) + .build() + .expect("Failed to create HTTP client"); + + // Build trackers from config + let trackers = config + .compressible_state + .as_ref() + .map(|s| CompressibleTrackers { + ctoken: s.ctoken_tracker.clone(), + pda: s.pda_tracker.clone(), + mint: s.mint_tracker.clone(), + }); + + // Create watch channels with empty initial values + let (metrics_tx, metrics_rx) = watch::channel(MetricsSnapshot::empty()); + let (compressible_tx, compressible_rx) = watch::channel(CompressibleSnapshot::empty()); + + // Create shutdown broadcast for providers + let (provider_shutdown_tx, _) = broadcast::channel::<()>(1); + + // Spawn background providers + tokio::spawn(run_metrics_provider( + metrics_tx, + http_client.clone(), + config.prometheus_url.clone(), + provider_shutdown_tx.subscribe(), + )); + + tokio::spawn(run_compressible_provider( + compressible_tx, + trackers, + config.rpc_url.clone(), + provider_shutdown_tx.subscribe(), + )); + + let cors = warp::cors() + .allow_any_origin() + .allow_methods(vec!["GET"]) + .allow_headers(vec!["Content-Type"]); let health_route = warp::path("health").and(warp::get()).map(|| { warp::reply::json(&HealthResponse { @@ -93,8 +389,10 @@ pub fn spawn_api_server(rpc_url: String, port: u16, allow_public_bind: bool) -> }) }); + // --- Status route (unchanged — per-request RPC call) --- + let rpc_url_for_status = config.rpc_url.clone(); let status_route = warp::path("status").and(warp::get()).and_then(move || { - let rpc_url = rpc_url.clone(); + let rpc_url = rpc_url_for_status.clone(); async move { let timeout_duration = Duration::from_secs(STATUS_TIMEOUT_SECS); match tokio::time::timeout(timeout_duration, get_forester_status(&rpc_url)) @@ -102,7 +400,7 @@ pub fn spawn_api_server(rpc_url: String, port: u16, allow_public_bind: bool) -> { Ok(Ok(status)) => Ok::<_, warp::Rejection>(warp::reply::with_status( warp::reply::json(&status), - StatusCode::OK, + warp::http::StatusCode::OK, )), Ok(Err(e)) => { error!("Failed to get forester status: {:?}", e); @@ -111,7 +409,7 @@ pub fn spawn_api_server(rpc_url: String, port: u16, allow_public_bind: bool) -> }; Ok(warp::reply::with_status( warp::reply::json(&error_response), - StatusCode::INTERNAL_SERVER_ERROR, + warp::http::StatusCode::INTERNAL_SERVER_ERROR, )) } Err(_elapsed) => { @@ -127,46 +425,39 @@ pub fn spawn_api_server(rpc_url: String, port: u16, allow_public_bind: bool) -> }; Ok(warp::reply::with_status( warp::reply::json(&error_response), - StatusCode::GATEWAY_TIMEOUT, + warp::http::StatusCode::GATEWAY_TIMEOUT, )) } } } }); - let metrics_route = - warp::path!("metrics" / "json") - .and(warp::get()) - .and_then(|| async move { - match get_metrics_json() { - Ok(metrics) => Ok::<_, warp::Rejection>(warp::reply::with_status( - warp::reply::json(&metrics), - StatusCode::OK, - )), - Err(e) => { - error!("Failed to encode metrics: {}", e); - let error_response = ErrorResponse { - error: format!("Failed to encode metrics: {}", e), - }; - Ok(warp::reply::with_status( - warp::reply::json(&error_response), - StatusCode::INTERNAL_SERVER_ERROR, - )) - } - } - }); + // --- Metrics route (reads latest snapshot from watch channel) --- + let metrics_rx_clone = metrics_rx.clone(); + let metrics_route = warp::path!("metrics" / "json") + .and(warp::get()) + .map(move || warp::reply::json(&*metrics_rx_clone.borrow())); + + // --- Compressible route (reads latest snapshot from watch channel) --- + let compressible_rx_clone = compressible_rx.clone(); + let compressible_route = warp::path("compressible") + .and(warp::get()) + .map(move || warp::reply::json(&*compressible_rx_clone.borrow())); - let routes = dashboard_route - .or(health_route) + let routes = health_route .or(status_route) - .or(metrics_route); + .or(metrics_route) + .or(compressible_route) + .with(cors); warp::serve(routes) .bind(addr) .await - .graceful(async { + .graceful(async move { let _ = shutdown_rx.await; info!("API server received shutdown signal"); + // Signal providers to stop + let _ = provider_shutdown_tx.send(()); }) .run() .await; @@ -180,6 +471,13 @@ pub fn spawn_api_server(rpc_url: String, port: u16, allow_public_bind: bool) -> } } +fn is_metrics_empty(m: &MetricsResponse) -> bool { + m.transactions_processed_total.is_empty() + && m.forester_balances.is_empty() + && m.queue_lengths.is_empty() + && m.last_run_timestamp == 0 +} + fn get_metrics_json() -> Result { use prometheus::proto::MetricType; diff --git a/forester/src/cli.rs b/forester/src/cli.rs index acf0c9ae8c..8da994cc17 100644 --- a/forester/src/cli.rs +++ b/forester/src/cli.rs @@ -13,6 +13,8 @@ pub enum Commands { Start(StartArgs), Status(StatusArgs), Health(HealthArgs), + /// Run a standalone API server for the dashboard (no forester processing) + Dashboard(DashboardArgs), } #[derive(Parser, Clone, Debug)] @@ -274,6 +276,13 @@ pub struct StartArgs { help = "Filter trees by group authority pubkey. Only process trees owned by this authority." )] pub group_authority: Option, + + #[arg( + long, + env = "PROMETHEUS_URL", + help = "Prometheus server URL for querying metrics (e.g. http://prometheus:9090)" + )] + pub prometheus_url: Option, } #[derive(Parser, Clone, Debug)] @@ -345,6 +354,35 @@ pub struct HealthArgs { pub output: String, } +#[derive(Parser, Clone, Debug)] +pub struct DashboardArgs { + #[arg(long, env = "RPC_URL")] + pub rpc_url: String, + + #[arg( + long, + env = "API_SERVER_PORT", + help = "HTTP API server port (default: 8080)", + default_value = "8080" + )] + pub port: u16, + + #[arg( + long, + env = "API_SERVER_PUBLIC_BIND", + help = "Bind API server to 0.0.0.0 instead of 127.0.0.1", + default_value = "false" + )] + pub public_bind: bool, + + #[arg( + long, + env = "PROMETHEUS_URL", + help = "Prometheus server URL for querying metrics (e.g. http://prometheus:9090)" + )] + pub prometheus_url: Option, +} + #[derive(Default, Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] pub enum ProcessorMode { #[clap(name = "v1")] diff --git a/forester/src/compressible/bootstrap_helpers.rs b/forester/src/compressible/bootstrap_helpers.rs index 8ad43ec638..625aa97bf7 100644 --- a/forester/src/compressible/bootstrap_helpers.rs +++ b/forester/src/compressible/bootstrap_helpers.rs @@ -18,7 +18,10 @@ use solana_sdk::pubkey::Pubkey; use tokio::{sync::oneshot, time::timeout}; use tracing::{debug, info}; -use super::config::{DEFAULT_PAGE_SIZE, DEFAULT_PAGINATION_DELAY_MS}; +use super::config::{ + ACCOUNT_TYPE_OFFSET, CTOKEN_ACCOUNT_TYPE_FILTER, DEFAULT_PAGE_SIZE, + DEFAULT_PAGINATION_DELAY_MS, MINT_ACCOUNT_TYPE_FILTER, +}; use crate::Result; const RPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); @@ -474,3 +477,111 @@ where Ok(result) } + +/// Count program accounts matching the given filters without downloading +/// account data. +/// +/// Uses `dataSlice: { offset: 0, length: 0 }` so the RPC returns only pubkeys +/// and metadata — cutting bandwidth by ~99% compared to fetching full accounts. +/// Automatically selects standard vs V2 (paginated) API based on localhost +/// detection, same as `run_bootstrap`. +pub async fn count_program_accounts( + rpc_url: &str, + program_id: &Pubkey, + filters: Option>, +) -> Result { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {}", e))?; + + if is_localhost(rpc_url) { + let mut params = json!({ + "encoding": "base64", + "commitment": "confirmed", + "dataSlice": { "offset": 0, "length": 0 } + }); + if let Some(ref f) = filters { + params["filters"] = json!(f); + } + let payload = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "getProgramAccounts", + "params": [program_id.to_string(), params] + }); + let result = send_rpc_request(&client, rpc_url, &payload).await?; + Ok(result.as_array().map(|a| a.len()).unwrap_or(0)) + } else { + let mut total = 0usize; + let mut cursor: Option = None; + let mut page = 0i32; + + loop { + page += 1; + let mut params = json!({ + "encoding": "base64", + "commitment": "confirmed", + "dataSlice": { "offset": 0, "length": 0 }, + "limit": PAGE_SIZE + }); + if let Some(ref f) = filters { + params["filters"] = json!(f); + } + if let Some(ref c) = cursor { + params["paginationKey"] = json!(c); + } + let payload = json!({ + "jsonrpc": "2.0", + "id": page, + "method": "getProgramAccountsV2", + "params": [program_id.to_string(), params] + }); + + let result = send_rpc_request(&client, rpc_url, &payload).await?; + let count = extract_accounts_array(&result) + .map(|a| a.len()) + .unwrap_or(0); + if count == 0 { + break; + } + total += count; + + cursor = extract_pagination_cursor(&result); + if cursor.is_none() { + break; + } + tokio::time::sleep(Duration::from_millis(DEFAULT_PAGINATION_DELAY_MS)).await; + } + + Ok(total) + } +} + +/// Counts compressible CToken and Mint accounts on-chain via RPC. +/// +/// Uses `count_program_accounts` with `dataSlice` to minimize +/// bandwidth. Both queries run concurrently. Errors are propagated so callers +/// can distinguish "0 accounts" from "RPC failure". +pub async fn count_compressible_accounts(rpc_url: &str) -> Result<(usize, usize)> { + let program_id = Pubkey::new_from_array(light_token_interface::LIGHT_TOKEN_PROGRAM_ID); + + let ctoken_filters = vec![json!({"memcmp": { + "offset": ACCOUNT_TYPE_OFFSET, + "bytes": CTOKEN_ACCOUNT_TYPE_FILTER, + "encoding": "base58" + }})]; + + let mint_filters = vec![json!({"memcmp": { + "offset": ACCOUNT_TYPE_OFFSET, + "bytes": MINT_ACCOUNT_TYPE_FILTER, + "encoding": "base58" + }})]; + + let (ctoken_result, mint_result) = tokio::join!( + count_program_accounts(rpc_url, &program_id, Some(ctoken_filters)), + count_program_accounts(rpc_url, &program_id, Some(mint_filters)), + ); + + Ok((ctoken_result?, mint_result?)) +} diff --git a/forester/src/compressible/config.rs b/forester/src/compressible/config.rs index 011535d198..e65b6e8e8b 100644 --- a/forester/src/compressible/config.rs +++ b/forester/src/compressible/config.rs @@ -7,9 +7,6 @@ use solana_sdk::pubkey::Pubkey; // Shared Constants // ============================================================================= -/// Registry program ID for compress_and_close operations -pub const REGISTRY_PROGRAM_ID: &str = "Lighton6oQpVkeewmo2mcPTQQp7kYHr4fWpAgJyEmDX"; - /// Offset in CToken/Mint account data where account_type byte is stored. /// Used for memcmp filters to identify decompressed accounts. pub const ACCOUNT_TYPE_OFFSET: usize = 165; diff --git a/forester/src/compressible/ctoken/compressor.rs b/forester/src/compressible/ctoken/compressor.rs index 00ee28b579..79d9898927 100644 --- a/forester/src/compressible/ctoken/compressor.rs +++ b/forester/src/compressible/ctoken/compressor.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, sync::Arc}; +use std::sync::Arc; use anchor_lang::{InstructionData, ToAccountMetas}; use forester_utils::rpc_pool::SolanaRpcPool; @@ -20,10 +20,7 @@ use solana_sdk::{ use tracing::{debug, info}; use super::{state::CTokenAccountTracker, types::CTokenAccountState}; -use crate::{ - compressible::{config::REGISTRY_PROGRAM_ID, traits::CompressibleTracker}, - Result, -}; +use crate::{compressible::traits::CompressibleTracker, Result}; /// Compression executor for CToken accounts via the registry program's compress_and_close instruction. pub struct CTokenCompressor { @@ -60,7 +57,7 @@ impl CTokenCompressor { account_states: &[CTokenAccountState], registered_forester_pda: Pubkey, ) -> Result { - let registry_program_id = Pubkey::from_str(REGISTRY_PROGRAM_ID)?; + let registry_program_id = light_registry::ID; let compressed_token_program_id = Pubkey::new_from_array(LIGHT_TOKEN_PROGRAM_ID); // Derive compression_authority PDA deterministically (version = 1) @@ -242,14 +239,13 @@ impl CTokenCompressor { self.tracker.remove(&account_state.pubkey); } info!("compress_and_close tx confirmed: {}", signature); + Ok(signature) } else { // Transaction not confirmed - keep accounts in tracker for retry - tracing::warn!( + Err(anyhow::anyhow!( "compress_and_close tx not confirmed: {} - accounts kept in tracker for retry", signature - ); + )) } - - Ok(signature) } } diff --git a/forester/src/compressible/mod.rs b/forester/src/compressible/mod.rs index 849f42990b..4739ee8671 100644 --- a/forester/src/compressible/mod.rs +++ b/forester/src/compressible/mod.rs @@ -7,9 +7,10 @@ pub mod subscriber; pub mod traits; pub mod validation; +pub use bootstrap_helpers::{count_compressible_accounts, count_program_accounts}; pub use config::{ CompressibleConfig, PdaProgramConfig, ACCOUNT_TYPE_OFFSET, CTOKEN_ACCOUNT_TYPE_FILTER, - DEFAULT_PAGE_SIZE, DEFAULT_PAGINATION_DELAY_MS, MINT_ACCOUNT_TYPE_FILTER, REGISTRY_PROGRAM_ID, + DEFAULT_PAGE_SIZE, DEFAULT_PAGINATION_DELAY_MS, MINT_ACCOUNT_TYPE_FILTER, }; pub use ctoken::{ bootstrap_ctoken_accounts, CTokenAccountState, CTokenAccountTracker, CTokenCompressor, diff --git a/forester/src/compressible/subscriber.rs b/forester/src/compressible/subscriber.rs index 096f457bb4..cf10ccead4 100644 --- a/forester/src/compressible/subscriber.rs +++ b/forester/src/compressible/subscriber.rs @@ -162,6 +162,10 @@ impl AccountSubscriber { "{} connection lost (attempt {}), reconnecting in {:?}...", self.config.name, attempt, current_delay ); + + // Reset backoff after a successful connection + current_delay = self.reconnect_config.initial_delay; + attempt = 0; } Err(e) => { attempt += 1; diff --git a/forester/src/compressible/validation.rs b/forester/src/compressible/validation.rs index 7378966084..3bc7dedc9b 100644 --- a/forester/src/compressible/validation.rs +++ b/forester/src/compressible/validation.rs @@ -3,14 +3,10 @@ //! This module provides functions to validate on-chain configuration accounts //! at forester startup, allowing fail-fast behavior on misconfigurations. -use std::str::FromStr; - use anchor_lang::AccountDeserialize; use light_compressible::config::CompressibleConfig as OnChainCompressibleConfig; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::pubkey::Pubkey; -use super::config::REGISTRY_PROGRAM_ID; use crate::Result; /// Validates the on-chain CompressibleConfig for CToken/Mint compression. @@ -28,7 +24,7 @@ use crate::Result; /// - The config state is Inactive (paused) /// - RPC communication fails pub async fn validate_compressible_config(rpc_url: &str) -> Result<()> { - let registry_program_id = Pubkey::from_str(REGISTRY_PROGRAM_ID)?; + let registry_program_id = light_registry::ID; // Derive the CompressibleConfig PDA let (config_pda, _) = OnChainCompressibleConfig::derive_v1_config_pda(®istry_program_id); diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 687125b5a6..49da980118 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -795,6 +795,12 @@ impl EpochManager { return Ok(()); } + // Ensure we reset the processing flag when this scope exits + // (whether by normal return, early return, or panic). + let _reset_guard = scopeguard::guard((), |_| { + processing_flag.store(false, Ordering::SeqCst); + }); + let phases = get_epoch_phases(&self.protocol_config, epoch); // Attempt to recover registration info @@ -873,11 +879,6 @@ impl EpochManager { // TODO: implement // self.claim(®istration_info).await?; - // Ensure we reset the processing flag when we're done - let _reset_guard = scopeguard::guard((), |_| { - processing_flag.store(false, Ordering::SeqCst); - }); - info!("Exiting process_epoch"); Ok(()) } @@ -922,7 +923,7 @@ impl EpochManager { "Registration for epoch {} hasn't started yet (current slot: {}, starts at: {}). Waiting {} slots...", epoch, slot, phases.registration.start, slots_to_wait ); - let wait_duration = Duration::from_millis(slots_to_wait * 400); + let wait_duration = slot_duration() * slots_to_wait as u32; sleep(wait_duration).await; } @@ -2485,7 +2486,7 @@ impl EpochManager { items_processed, duration ); - queue_metric_update(epoch_num, items_processed, duration).await; + queue_metric_update(epoch_num, items_processed, duration); self.increment_processed_items_count(epoch_num, items_processed) .await; } @@ -2616,7 +2617,8 @@ impl EpochManager { .collect(); let timeout_slots = slots_until_active.saturating_sub(5); - let timeout_duration = Duration::from_millis((timeout_slots * 400).min(30_000)); + let timeout_duration = + (slot_duration() * timeout_slots as u32).min(Duration::from_secs(30)); info!( "Starting pre-warming for {} trees ({} skipped by config) with {}ms timeout", diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index ab5ff8014b..62812710f5 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -68,6 +68,25 @@ pub struct ForesterStatus { pub slots_until_next_light_slot: Option, /// Total number of light slots in the active phase pub total_light_slots: u64, + /// Total number of trees (active + rolled over) + pub total_trees: usize, + /// Number of active (non-rolled-over) trees + pub active_trees: usize, + /// Number of rolled-over trees + pub rolled_over_trees: usize, + /// Total pending items across all trees + pub total_pending_items: u64, + /// Aggregate queue statistics by tree type + pub aggregate_queue_stats: AggregateQueueStats, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AggregateQueueStats { + pub state_v1_total_pending: u64, + pub state_v2_input_pending: u64, + pub state_v2_output_pending: u64, + pub address_v1_total_pending: u64, + pub address_v2_input_pending: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -77,6 +96,8 @@ pub struct TreeStatus { pub queue: String, pub fullness_percentage: f64, pub next_index: u64, + pub capacity: u64, + pub height: u32, pub threshold: u64, pub is_rolledover: bool, pub queue_length: Option, @@ -212,6 +233,17 @@ pub async fn get_forester_status_with_options( light_slot_length: protocol_config_pda.config.slot_length, slots_until_next_light_slot: None, total_light_slots: 0, + total_trees: 0, + active_trees: 0, + rolled_over_trees: 0, + total_pending_items: 0, + aggregate_queue_stats: AggregateQueueStats { + state_v1_total_pending: 0, + state_v2_input_pending: 0, + state_v2_output_pending: 0, + address_v1_total_pending: 0, + address_v2_input_pending: 0, + }, }); } }; @@ -345,6 +377,46 @@ pub async fn get_forester_status_with_options( } } + // Compute aggregate statistics from tree statuses + let total_trees = tree_statuses.len(); + let active_trees = tree_statuses.iter().filter(|t| !t.is_rolledover).count(); + let rolled_over_trees = tree_statuses.iter().filter(|t| t.is_rolledover).count(); + let total_pending_items: u64 = tree_statuses.iter().filter_map(|t| t.queue_length).sum(); + + let mut aggregate_queue_stats = AggregateQueueStats { + state_v1_total_pending: 0, + state_v2_input_pending: 0, + state_v2_output_pending: 0, + address_v1_total_pending: 0, + address_v2_input_pending: 0, + }; + + for t in &tree_statuses { + match t.tree_type.as_str() { + "StateV1" => { + aggregate_queue_stats.state_v1_total_pending += t.queue_length.unwrap_or(0); + } + "StateV2" => { + if let Some(ref info) = t.v2_queue_info { + aggregate_queue_stats.state_v2_input_pending += + info.input_pending_batches * info.zkp_batch_size; + aggregate_queue_stats.state_v2_output_pending += + info.output_pending_batches * info.zkp_batch_size; + } + } + "AddressV1" => { + aggregate_queue_stats.address_v1_total_pending += t.queue_length.unwrap_or(0); + } + "AddressV2" => { + if let Some(ref info) = t.v2_queue_info { + aggregate_queue_stats.address_v2_input_pending += + info.input_pending_batches * info.zkp_batch_size; + } + } + _ => {} + } + } + Ok(ForesterStatus { slot, current_active_epoch, @@ -362,6 +434,11 @@ pub async fn get_forester_status_with_options( light_slot_length, slots_until_next_light_slot, total_light_slots, + total_trees, + active_trees, + rolled_over_trees, + total_pending_items, + aggregate_queue_stats, }) } @@ -508,133 +585,164 @@ fn parse_tree_status( let mut merkle_account = merkle_account.ok_or_else(|| anyhow::anyhow!("Merkle tree account not found"))?; - let (fullness_percentage, next_index, threshold, queue_length, v2_queue_info) = match tree - .tree_type - { - TreeType::StateV1 => { - let tree_account = StateMerkleTreeAccount::deserialize(&mut &merkle_account.data[8..]) + let (fullness_percentage, next_index, capacity, height, threshold, queue_length, v2_queue_info) = + match tree.tree_type { + TreeType::StateV1 => { + let tree_account = StateMerkleTreeAccount::deserialize( + &mut &merkle_account.data[8..], + ) .map_err(|e| anyhow::anyhow!("Failed to deserialize StateV1 metadata: {}", e))?; - let height = STATE_MERKLE_TREE_HEIGHT; - let capacity = 1u64 << height; - let threshold_val = capacity - .saturating_mul(tree_account.metadata.rollover_metadata.rollover_threshold) - / 100; - - let merkle_tree = - parse_concurrent_merkle_tree_from_bytes::( - &merkle_account.data, - ) + let height = STATE_MERKLE_TREE_HEIGHT; + let capacity = 1u64 << height; + let threshold_val = capacity + .saturating_mul(tree_account.metadata.rollover_metadata.rollover_threshold) + / 100; + + let merkle_tree = parse_concurrent_merkle_tree_from_bytes::< + StateMerkleTreeAccount, + Poseidon, + 26, + >(&merkle_account.data) .map_err(|e| anyhow::anyhow!("Failed to parse StateV1 tree: {:?}", e))?; - let next_index = merkle_tree.next_index() as u64; - let fullness = next_index as f64 / capacity as f64 * 100.0; - - let queue_len = queue_account.and_then(|acc| { - unsafe { parse_hash_set_from_bytes::(&acc.data) } - .ok() - .map(|hs| { - hs.iter() - .filter(|(_, cell)| cell.sequence_number.is_none()) - .count() as u64 - }) - }); + let next_index = merkle_tree.next_index() as u64; + let fullness = next_index as f64 / capacity as f64 * 100.0; - (fullness, next_index, threshold_val, queue_len, None) - } - TreeType::AddressV1 => { - let height = ADDRESS_MERKLE_TREE_HEIGHT; - let capacity = 1u64 << height; - - let threshold_val = queue_account - .as_ref() - .and_then(|acc| QueueAccount::deserialize(&mut &acc.data[8..]).ok()) - .map(|q| { - capacity.saturating_mul(q.metadata.rollover_metadata.rollover_threshold) / 100 - }) - .unwrap_or(0); - - let merkle_tree = parse_indexed_merkle_tree_from_bytes::< - AddressMerkleTreeAccount, - Poseidon, - usize, - 26, - 16, - >(&merkle_account.data) - .map_err(|e| anyhow::anyhow!("Failed to parse AddressV1 tree: {:?}", e))?; - - let next_index = merkle_tree - .next_index() - .saturating_sub(INDEXED_MERKLE_TREE_V1_INITIAL_LEAVES) - as u64; - let fullness = next_index as f64 / capacity as f64 * 100.0; - - let queue_len = queue_account.and_then(|acc| { - unsafe { parse_hash_set_from_bytes::(&acc.data) } - .ok() - .map(|hs| { - hs.iter() - .filter(|(_, cell)| cell.sequence_number.is_none()) - .count() as u64 + let queue_len = queue_account.and_then(|acc| { + unsafe { parse_hash_set_from_bytes::(&acc.data) } + .ok() + .map(|hs| { + hs.iter() + .filter(|(_, cell)| cell.sequence_number.is_none()) + .count() as u64 + }) + }); + + ( + fullness, + next_index, + capacity, + height as u32, + threshold_val, + queue_len, + None, + ) + } + TreeType::AddressV1 => { + let height = ADDRESS_MERKLE_TREE_HEIGHT; + let capacity = 1u64 << height; + + let threshold_val = queue_account + .as_ref() + .and_then(|acc| QueueAccount::deserialize(&mut &acc.data[8..]).ok()) + .map(|q| { + capacity.saturating_mul(q.metadata.rollover_metadata.rollover_threshold) + / 100 }) - }); - - (fullness, next_index, threshold_val, queue_len, None) - } - TreeType::StateV2 => { - let merkle_tree = BatchedMerkleTreeAccount::state_from_bytes( - &mut merkle_account.data, - &tree.merkle_tree.into(), - ) - .map_err(|e| anyhow::anyhow!("Failed to parse StateV2 tree: {:?}", e))?; - - let height = merkle_tree.height as u64; - let capacity = 1u64 << height; - let threshold_val = - (1u64 << height) * merkle_tree.metadata.rollover_metadata.rollover_threshold / 100; - let next_index = merkle_tree.next_index; - let fullness = next_index as f64 / capacity as f64 * 100.0; - - let v2_info = queue_account - .and_then(|mut acc| parse_state_v2_queue_info(&merkle_tree, &mut acc.data).ok()); - let queue_len = v2_info - .as_ref() - .map(|i| (i.input_pending_batches + i.output_pending_batches) * i.zkp_batch_size); - - (fullness, next_index, threshold_val, queue_len, v2_info) - } - TreeType::AddressV2 => { - let merkle_tree = BatchedMerkleTreeAccount::address_from_bytes( - &mut merkle_account.data, - &tree.merkle_tree.into(), - ) - .map_err(|e| anyhow::anyhow!("Failed to parse AddressV2 tree: {:?}", e))?; - - let height = merkle_tree.height as u64; - let capacity = 1u64 << height; - let threshold_val = - capacity * merkle_tree.metadata.rollover_metadata.rollover_threshold / 100; - let fullness = merkle_tree.next_index as f64 / capacity as f64 * 100.0; - - let v2_info = parse_address_v2_queue_info(&merkle_tree); - let queue_len = Some(v2_info.input_pending_batches * v2_info.zkp_batch_size); - - ( - fullness, - merkle_tree.next_index, - threshold_val, - queue_len, - Some(v2_info), - ) - } - TreeType::Unknown => { - warn!( - "Encountered unknown tree type for merkle_tree={}, queue={}", - tree.merkle_tree, tree.queue - ); - (0.0, 0, 0, None, None) - } - }; + .unwrap_or(0); + + let merkle_tree = parse_indexed_merkle_tree_from_bytes::< + AddressMerkleTreeAccount, + Poseidon, + usize, + 26, + 16, + >(&merkle_account.data) + .map_err(|e| anyhow::anyhow!("Failed to parse AddressV1 tree: {:?}", e))?; + + let next_index = merkle_tree + .next_index() + .saturating_sub(INDEXED_MERKLE_TREE_V1_INITIAL_LEAVES) + as u64; + let fullness = next_index as f64 / capacity as f64 * 100.0; + + let queue_len = queue_account.and_then(|acc| { + unsafe { parse_hash_set_from_bytes::(&acc.data) } + .ok() + .map(|hs| { + hs.iter() + .filter(|(_, cell)| cell.sequence_number.is_none()) + .count() as u64 + }) + }); + + ( + fullness, + next_index, + capacity, + height as u32, + threshold_val, + queue_len, + None, + ) + } + TreeType::StateV2 => { + let merkle_tree = BatchedMerkleTreeAccount::state_from_bytes( + &mut merkle_account.data, + &tree.merkle_tree.into(), + ) + .map_err(|e| anyhow::anyhow!("Failed to parse StateV2 tree: {:?}", e))?; + + let height = merkle_tree.height as u64; + let capacity = 1u64 << height; + let threshold_val = (1u64 << height) + * merkle_tree.metadata.rollover_metadata.rollover_threshold + / 100; + let next_index = merkle_tree.next_index; + let fullness = next_index as f64 / capacity as f64 * 100.0; + + let v2_info = queue_account.and_then(|mut acc| { + parse_state_v2_queue_info(&merkle_tree, &mut acc.data).ok() + }); + let queue_len = v2_info.as_ref().map(|i| { + (i.input_pending_batches + i.output_pending_batches) * i.zkp_batch_size + }); + + ( + fullness, + next_index, + capacity, + height as u32, + threshold_val, + queue_len, + v2_info, + ) + } + TreeType::AddressV2 => { + let merkle_tree = BatchedMerkleTreeAccount::address_from_bytes( + &mut merkle_account.data, + &tree.merkle_tree.into(), + ) + .map_err(|e| anyhow::anyhow!("Failed to parse AddressV2 tree: {:?}", e))?; + + let height = merkle_tree.height as u64; + let capacity = 1u64 << height; + let threshold_val = + capacity * merkle_tree.metadata.rollover_metadata.rollover_threshold / 100; + let fullness = merkle_tree.next_index as f64 / capacity as f64 * 100.0; + + let v2_info = parse_address_v2_queue_info(&merkle_tree); + let queue_len = Some(v2_info.input_pending_batches * v2_info.zkp_batch_size); + + ( + fullness, + merkle_tree.next_index, + capacity, + height as u32, + threshold_val, + queue_len, + Some(v2_info), + ) + } + TreeType::Unknown => { + warn!( + "Encountered unknown tree type for merkle_tree={}, queue={}", + tree.merkle_tree, tree.queue + ); + (0.0, 0, 0, 0, 0, None, None) + } + }; Ok(TreeStatus { tree_type: tree.tree_type.to_string(), @@ -642,6 +750,8 @@ fn parse_tree_status( queue: tree.queue.to_string(), fullness_percentage, next_index, + capacity, + height, threshold, is_rolledover: tree.is_rolledover, queue_length, diff --git a/forester/src/main.rs b/forester/src/main.rs index 19d52bc4b1..91cdb9d506 100644 --- a/forester/src/main.rs +++ b/forester/src/main.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use clap::Parser; use forester::{ - api_server::spawn_api_server, + api_server::{spawn_api_server, ApiServerConfig}, cli::{Cli, Commands}, errors::ForesterError, forester_status, @@ -87,11 +87,13 @@ async fn main() -> Result<(), ForesterError> { .map(RateLimiter::new); let rpc_url_for_api: String = config.external_services.rpc_url.to_string(); - let api_server_handle = spawn_api_server( - rpc_url_for_api, - args.api_server_port, - args.api_server_public_bind, - ); + let api_server_handle = spawn_api_server(ApiServerConfig { + rpc_url: rpc_url_for_api, + port: args.api_server_port, + allow_public_bind: args.api_server_public_bind, + compressible_state: None, + prometheus_url: args.prometheus_url.clone(), + }); // Create compressible shutdown channels if compressible is enabled let (shutdown_receiver_compressible, shutdown_receiver_bootstrap) = @@ -142,6 +144,26 @@ async fn main() -> Result<(), ForesterError> { std::process::exit(1); } } + Commands::Dashboard(args) => { + tracing::info!( + "Starting standalone dashboard API server on port {}", + args.port + ); + let api_server_handle = spawn_api_server(ApiServerConfig { + rpc_url: args.rpc_url.clone(), + port: args.port, + allow_public_bind: args.public_bind, + compressible_state: None, + prometheus_url: args.prometheus_url.clone(), + }); + + // Block until Ctrl+C + if let Err(e) = ctrl_c().await { + tracing::error!("Failed to listen for Ctrl+C: {}", e); + } + tracing::info!("Received Ctrl+C, shutting down dashboard API server..."); + api_server_handle.shutdown(); + } } Ok(()) } diff --git a/forester/src/metrics.rs b/forester/src/metrics.rs index e5a4c9aa8c..356ca54480 100644 --- a/forester/src/metrics.rs +++ b/forester/src/metrics.rs @@ -8,7 +8,6 @@ use prometheus::{ Encoder, GaugeVec, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry, TextEncoder, }; use reqwest::Client; -use tokio::sync::Mutex; use tracing::{debug, error, log::trace}; use crate::Result; @@ -106,8 +105,8 @@ lazy_static! { error!("Failed to create metric INDEXER_PROOF_COUNT: {:?}", e); std::process::exit(1); }); - static ref METRIC_UPDATES: Mutex> = - Mutex::new(Vec::new()); + static ref METRIC_UPDATES: std::sync::Mutex> = + std::sync::Mutex::new(Vec::new()); } static INIT: Once = Once::new(); @@ -181,13 +180,13 @@ pub fn update_transactions_processed(epoch: u64, count: usize, duration: std::ti ); } -pub async fn queue_metric_update(epoch: u64, count: usize, duration: std::time::Duration) { - let mut updates = METRIC_UPDATES.lock().await; +pub fn queue_metric_update(epoch: u64, count: usize, duration: std::time::Duration) { + let mut updates = METRIC_UPDATES.lock().unwrap_or_else(|e| e.into_inner()); updates.push((epoch, count, duration)); } -pub async fn process_queued_metrics() { - let mut updates = METRIC_UPDATES.lock().await; +pub fn process_queued_metrics() { + let mut updates = METRIC_UPDATES.lock().unwrap_or_else(|e| e.into_inner()); for (epoch, count, duration) in updates.drain(..) { update_transactions_processed(epoch, count, duration); } @@ -243,7 +242,7 @@ pub async fn push_metrics(url: &Option) -> Result<()> { } }; - process_queued_metrics().await; + process_queued_metrics(); update_last_run_timestamp(); @@ -267,6 +266,130 @@ pub async fn push_metrics(url: &Option) -> Result<()> { } } +/// Query a Prometheus server for forester metrics and return a MetricsResponse. +/// +/// Runs PromQL instant queries for the same metrics the in-memory REGISTRY +/// exposes, so the dashboard can show aggregated data from all foresters +/// even when running in standalone mode. +/// +/// Accepts a shared `reqwest::Client` (with timeout pre-configured) to reuse +/// connection pools across calls. +pub async fn query_prometheus_metrics( + client: &Client, + prometheus_url: &str, +) -> Result { + use std::collections::HashMap; + + let base = prometheus_url.trim_end_matches('/'); + + async fn query_instant( + client: &Client, + base: &str, + promql: &str, + ) -> anyhow::Result { + let url = format!("{}/api/v1/query", base); + let resp = client + .get(&url) + .query(&[("query", promql)]) + .send() + .await + .map_err(|e| anyhow::anyhow!("Prometheus request failed: {}", e))?; + + if !resp.status().is_success() { + return Err(anyhow::anyhow!("Prometheus HTTP error: {}", resp.status())); + } + + let body: serde_json::Value = resp + .json() + .await + .map_err(|e| anyhow::anyhow!("Prometheus JSON parse error: {}", e))?; + + if body.get("status").and_then(|s| s.as_str()) != Some("success") { + return Err(anyhow::anyhow!("Prometheus query failed: {:?}", body)); + } + + Ok(body["data"]["result"].clone()) + } + + /// Extract label→value pairs from a Prometheus vector result. + fn extract_label_values(result: &serde_json::Value, label_key: &str) -> Vec<(String, f64)> { + let arr = match result.as_array() { + Some(a) => a, + None => return Vec::new(), + }; + arr.iter() + .filter_map(|entry| { + let label = entry["metric"][label_key].as_str()?.to_string(); + let val_str = entry["value"].as_array()?.get(1)?.as_str()?; + let val: f64 = val_str.parse().ok()?; + Some((label, val)) + }) + .collect() + } + + // Run all queries concurrently + let (tx_total, tx_rate, last_run, balances, queues) = tokio::join!( + query_instant( + client, + base, + "sum(forester_transactions_processed_total) by (epoch)" + ), + query_instant(client, base, "sum(forester_transaction_rate) by (epoch)"), + query_instant(client, base, "max(forester_last_run_timestamp)"), + query_instant(client, base, "forester_sol_balance"), + query_instant(client, base, "queue_length"), + ); + + let mut transactions_processed_total: HashMap = HashMap::new(); + if let Ok(ref v) = tx_total { + for (epoch, val) in extract_label_values(v, "epoch") { + transactions_processed_total.insert(epoch, val as u64); + } + } + + let mut transaction_rate: HashMap = HashMap::new(); + if let Ok(ref v) = tx_rate { + for (epoch, val) in extract_label_values(v, "epoch") { + transaction_rate.insert(epoch, val); + } + } + + let last_run_timestamp: i64 = if let Ok(ref v) = last_run { + v.as_array() + .and_then(|arr| arr.first()) + .and_then(|entry| entry["value"].as_array()) + .and_then(|pair| pair.get(1)) + .and_then(|s| s.as_str()) + .and_then(|s| s.parse::().ok()) + .map(|f| f as i64) + .unwrap_or(0) + } else { + 0 + }; + + let mut forester_balances: HashMap = HashMap::new(); + if let Ok(ref v) = balances { + for (pubkey, val) in extract_label_values(v, "pubkey") { + forester_balances.insert(pubkey, val); + } + } + + let mut queue_lengths: HashMap = HashMap::new(); + if let Ok(ref v) = queues { + for (tree_pubkey, val) in extract_label_values(v, "tree_pubkey") { + queue_lengths.insert(tree_pubkey, val as i64); + } + } + + Ok(crate::api_server::MetricsResponse { + transactions_processed_total, + transaction_rate, + last_run_timestamp, + forester_balances, + queue_lengths, + }) +} + pub async fn metrics_handler() -> Result { use prometheus::Encoder; let encoder = TextEncoder::new(); diff --git a/forester/src/processor/v1/helpers.rs b/forester/src/processor/v1/helpers.rs index 606f3c1cbd..0d00d82c4b 100644 --- a/forester/src/processor/v1/helpers.rs +++ b/forester/src/processor/v1/helpers.rs @@ -299,6 +299,14 @@ pub async fn fetch_proofs_and_create_instructions( Vec::new() }; + if address_proofs.len() != address_items.len() { + return Err(anyhow::anyhow!( + "Address proof count mismatch: requested={}, received={}", + address_items.len(), + address_proofs.len() + )); + } + for (item, proof) in address_items.iter().zip(address_proofs.into_iter()) { proofs.push(MerkleProofType::AddressProof(proof.clone())); let instruction = create_update_address_merkle_tree_instruction( @@ -328,6 +336,14 @@ pub async fn fetch_proofs_and_create_instructions( } // Process state proofs and create instructions + if state_proofs.len() != state_items.len() { + return Err(anyhow::anyhow!( + "State proof count mismatch: requested={}, received={}", + state_items.len(), + state_proofs.len() + )); + } + for (item, proof) in state_items.iter().zip(state_proofs.into_iter()) { proofs.push(MerkleProofType::StateProof(proof.clone())); diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index a91a94c3a9..3de6fb36c0 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -129,8 +129,7 @@ pub async fn send_batched_transactions res, Err(e) => { error!(tree = %tree_accounts.merkle_tree, "Failed to build transaction batch: {:?}", e); - operation_cancel_signal.store(true, Ordering::SeqCst); - break; + continue; } }; trace!(tree = %tree_accounts.merkle_tree, "Built {} transactions in {:?}", transactions_to_send.len(), build_start_time.elapsed()); diff --git a/forester/src/processor/v1/tx_builder.rs b/forester/src/processor/v1/tx_builder.rs index 295da52c07..eea8fcbb8a 100644 --- a/forester/src/processor/v1/tx_builder.rs +++ b/forester/src/processor/v1/tx_builder.rs @@ -128,7 +128,10 @@ impl TransactionBuilder for EpochManagerTransactions { Ok((_, instructions)) => instructions, Err(e) => { // Check if it's a "Record Not Found" error - return if e.to_string().contains("Record Not Found") { + let err_str = e.to_string(); + return if err_str.to_lowercase().contains("record not found") + || err_str.to_lowercase().contains("not found") + { warn!("Record not found in indexer, skipping batch: {}", e); // Return empty transactions but don't propagate the error Ok((vec![], last_valid_block_height)) @@ -148,7 +151,7 @@ impl TransactionBuilder for EpochManagerTransactions { recent_blockhash: *recent_blockhash, compute_unit_price: Some(priority_fee), compute_unit_limit: config.compute_unit_limit, - last_valid_block_hash: last_valid_block_height, + last_valid_block_height, }) .await?; transactions.push(transaction); diff --git a/forester/src/processor/v2/helpers.rs b/forester/src/processor/v2/helpers.rs index d58723c78e..22dd157aad 100644 --- a/forester/src/processor/v2/helpers.rs +++ b/forester/src/processor/v2/helpers.rs @@ -432,7 +432,8 @@ impl StreamingAddressQueue { /// Uses a polling loop to avoid race conditions between the available_elements /// and fetch_complete mutexes. Returns the number of available elements. pub fn wait_for_batch(&self, batch_end: usize) -> usize { - const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(5); + const POLL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120); + let start = std::time::Instant::now(); loop { let available = *lock_recover( @@ -451,7 +452,25 @@ impl StreamingAddressQueue { return available; } - std::thread::sleep(POLL_INTERVAL); + if start.elapsed() > POLL_TIMEOUT { + tracing::warn!( + "wait_for_batch timed out after {:?} waiting for {} elements (available: {})", + POLL_TIMEOUT, + batch_end, + available + ); + return available; + } + + // Use condvar wait with timeout instead of thread::sleep to avoid + // blocking the thread and to wake up promptly when data arrives. + let guard = lock_recover( + &self.available_elements, + "streaming_address_queue.available_elements", + ); + let _ = self + .data_ready + .wait_timeout(guard, std::time::Duration::from_millis(50)); } } diff --git a/forester/src/processor/v2/proof_worker.rs b/forester/src/processor/v2/proof_worker.rs index 69f8bd8c29..b7afeacf0b 100644 --- a/forester/src/processor/v2/proof_worker.rs +++ b/forester/src/processor/v2/proof_worker.rs @@ -1,5 +1,7 @@ use std::{sync::Arc, time::Duration}; +const MAX_CONCURRENT_PROOFS: usize = 64; + use async_channel::Receiver; use light_batched_merkle_tree::merkle_tree::{ InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs, @@ -173,11 +175,15 @@ async fn run_proof_pipeline( job_rx: Receiver, clients: Arc, ) -> crate::Result<()> { + let semaphore = Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_PROOFS)); + while let Ok(job) = job_rx.recv().await { let clients = clients.clone(); + let permit = semaphore.clone().acquire_owned().await; // Spawn immediately so we don't block receiving the next job - // while waiting for HTTP submission + // while waiting for HTTP submission. Semaphore bounds concurrency. tokio::spawn(async move { + let _permit = permit; submit_and_poll_proof(clients, job).await; }); } diff --git a/forester/src/processor/v2/root_guard.rs b/forester/src/processor/v2/root_guard.rs index 85dc1d564a..8416204454 100644 --- a/forester/src/processor/v2/root_guard.rs +++ b/forester/src/processor/v2/root_guard.rs @@ -29,7 +29,19 @@ pub fn reconcile_roots( indexer_root: [u8; 32], onchain_root: [u8; 32], ) -> RootReconcileDecision { - if expected_root == [0u8; 32] || indexer_root == expected_root { + if expected_root == [0u8; 32] { + // Uninitialized expected root — proceed but adopt the indexer root. + // Validate that indexer and on-chain agree when possible. + if indexer_root != onchain_root { + tracing::warn!( + "Proceeding with uninitialized expected root, but indexer root ({:?}) != onchain root ({:?}). Indexer may be stale.", + &indexer_root[..4], + &onchain_root[..4], + ); + } + return RootReconcileDecision::Proceed; + } + if indexer_root == expected_root { return RootReconcileDecision::Proceed; } diff --git a/forester/src/rollover/operations.rs b/forester/src/rollover/operations.rs index c6a11a078e..1c5f456874 100644 --- a/forester/src/rollover/operations.rs +++ b/forester/src/rollover/operations.rs @@ -1,4 +1,5 @@ use account_compression::{ + utils::constants::{ADDRESS_MERKLE_TREE_HEIGHT, STATE_MERKLE_TREE_HEIGHT}, AddressMerkleTreeAccount, AddressMerkleTreeConfig, AddressQueueConfig, NullifierQueueConfig, QueueAccount, StateMerkleTreeAccount, StateMerkleTreeConfig, }; @@ -68,7 +69,7 @@ pub async fn get_tree_fullness( .await .map_err(|e| ForesterError::Other(anyhow::anyhow!("{}", e)))?; - let height = 26; + let height = STATE_MERKLE_TREE_HEIGHT; let capacity = 1 << height; let threshold = ((1 << height) * account.metadata.rollover_metadata.rollover_threshold / 100) as usize; @@ -107,7 +108,7 @@ pub async fn get_tree_fullness( ) .await .map_err(|e| ForesterError::Other(anyhow::anyhow!("{}", e)))?; - let height = 26; + let height = ADDRESS_MERKLE_TREE_HEIGHT; let capacity = 1 << height; let threshold = ((1 << height) diff --git a/forester/src/slot_tracker.rs b/forester/src/slot_tracker.rs index 30d9842745..fae2ee97f4 100644 --- a/forester/src/slot_tracker.rs +++ b/forester/src/slot_tracker.rs @@ -10,8 +10,10 @@ use light_client::rpc::Rpc; use tokio::time::{sleep, Duration}; use tracing::{error, trace}; +const SLOT_DURATION_MS: u64 = 400; + pub fn slot_duration() -> Duration { - Duration::from_nanos(solana_sdk::genesis_config::GenesisConfig::default().ns_per_slot() as u64) + Duration::from_millis(SLOT_DURATION_MS) } fn slot_duration_secs() -> f64 { diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs index 29e93feb0f..5f5822199f 100644 --- a/forester/src/smart_transaction.rs +++ b/forester/src/smart_transaction.rs @@ -23,7 +23,7 @@ pub struct CreateSmartTransactionConfig { pub compute_unit_price: Option, pub compute_unit_limit: Option, pub instructions: Vec, - pub last_valid_block_hash: u64, + pub last_valid_block_height: u64, } /// Poll a transaction to check whether it has been confirmed @@ -139,5 +139,5 @@ pub async fn create_smart_transaction( let mut tx = Transaction::new_with_payer(&final_instructions, Some(&payer_pubkey)); tx.sign(&[&config.payer], config.recent_blockhash); - Ok((tx, config.last_valid_block_hash)) + Ok((tx, config.last_valid_block_height)) } diff --git a/forester/src/telemetry.rs b/forester/src/telemetry.rs index 6845fca9cc..2d881406b2 100644 --- a/forester/src/telemetry.rs +++ b/forester/src/telemetry.rs @@ -2,7 +2,10 @@ use std::sync::Once; use env_logger::Env; use tracing_appender::rolling::{RollingFileAppender, Rotation}; -use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; +use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +static LOG_GUARD: std::sync::OnceLock = + std::sync::OnceLock::new(); static INIT: Once = Once::new(); @@ -28,28 +31,21 @@ pub fn setup_telemetry() { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); - let stdout_env_filter = - EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); - let stdout_layer = fmt::Layer::new() .with_writer(std::io::stdout) - .with_ansi(true) - .with_filter(stdout_env_filter); + .with_ansi(true); if let Some(file_appender) = file_appender { - let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); - let file_env_filter = EnvFilter::new("info"); - let file_layer = fmt::Layer::new() - .with_writer(non_blocking) - .with_filter(file_env_filter); + let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); + let _ = LOG_GUARD.set(guard); + + let file_layer = fmt::Layer::new().with_writer(non_blocking); tracing_subscriber::registry() .with(stdout_layer) .with(file_layer) .with(env_filter) .init(); - - std::mem::forget(_guard); } else { tracing_subscriber::registry() .with(stdout_layer) diff --git a/forester/static/dashboard.html b/forester/static/dashboard.html deleted file mode 100644 index 12f8ef6c25..0000000000 --- a/forester/static/dashboard.html +++ /dev/null @@ -1,1236 +0,0 @@ - - - - - - Forester Dashboard - - - -
-
-

Forester Dashboard

-
- -
- - Loading -
-
-
- -
-
-
-
- Loading status... -
-
-
- - - - diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs index c816a40ccc..3772539302 100644 --- a/forester/tests/priority_fee_test.rs +++ b/forester/tests/priority_fee_test.rs @@ -84,6 +84,7 @@ async fn test_priority_fee_request() { api_server_port: 8080, group_authority: None, light_pda_programs: vec![], + prometheus_url: None, }; let config = ForesterConfig::new_for_start(&args).expect("Failed to create config");