Conversation
All mocking moved to coordinator. Tool service and dispatcher service works together there.
There was a problem hiding this comment.
Pull request overview
This PR introduces a new “Dataplayer” flow backed by a coordinator service (gRPC on the UI server + HTTP/SSE wrappers for the browser), updates the UI to route users to the new dataplayer page, and keeps a legacy “dispatcher” run path under a new deprecated route.
Changes:
- Add coordinator-backed dataplayer UI (
DataplayerPage) and client wrappers (coordinatorApi.ts) plus server-side endpoints inserver.ts. - Add a Node gRPC client and generated protobuf TS artifacts for coordinator integration.
- Replace/remove legacy dispatcher route/API pieces and add a deprecated legacy run page.
Reviewed changes
Copilot reviewed 19 out of 23 changed files in this pull request and generated 22 comments.
Show a summary per file
| File | Description |
|---|---|
| src/types/dispatcher.ts | Removes dispatcher-related shared types (currently still referenced by legacy code). |
| src/types/dataplayerTypes.ts | Adds dataplayer/coordinator request & response types. |
| src/routes.ts | Replaces dispatcher route with dataplayer + legacy dataplayer routes. |
| src/pages/LandingPage.tsx | Adds a new “play” input that opens the dataplayer page. |
| src/pages/DataplayerPage.tsx | New dataplayer UI: file listing, tool matching/search, slot mapping, SSE monitoring. |
| src/pages/DataplayerLegacyRunPage.tsx | Keeps legacy run path but rewired to coordinator “*-old” endpoints. |
| src/lib/utils.ts | Changes dev/prod logging behavior (currently forced to dev). |
| src/lib/server/grpcClient.ts | Adds gRPC client logic for datasets/tools/launching. |
| src/lib/server/generated/google/protobuf/timestamp.ts | Generated protobuf runtime types. |
| src/lib/server/generated/google/protobuf/struct.ts | Generated protobuf runtime types. |
| src/lib/dispatcherApi.ts | Removes old dispatcher API client. |
| src/lib/deprecatedDispatcherApi.ts | Reintroduces dispatcher submission/status logic as “deprecated”. |
| src/lib/coordinatorApi.ts | Adds browser-facing API wrapper for coordinator endpoints. |
| src/components/SearchResultItem.tsx | Updates CTA buttons to “Play” (new dataplayer) + legacy run button. |
| src/components/DataplayInput.tsx | Adds new landing-page input component for dataplayer. |
| server.ts | Adds coordinator HTTP endpoints + SSE bridging; adds proxy route. |
| package.json / package-lock.json | Adds grpc/jsonwebtoken/tooling deps. |
| eslint.config.js | Updates ESLint config (currently has an overriding languageOptions bug). |
| README.md | Documents coordinator/gRPC codegen + submodule workflow. |
| .gitmodules | Adds req-packager submodule. |
Comments suppressed due to low confidence (3)
package.json:44
- Generated gRPC code imports
@bufbuild/protobuf/wireat runtime (viasrc/lib/server/generated/...), but@bufbuild/protobufis not listed independencies. If production installs omit devDependencies, the server will crash on startup. Add@bufbuild/protobuftodependencies(not just via a devDependency transitive).
"dependencies": {
"@grpc/grpc-js": "^1.14.3",
"@radix-ui/react-toast": "^1.2.15",
"@radix-ui/react-tooltip": "^1.2.8",
"@react-router/express": "^7.13.0",
"@react-router/fs-routes": "^7.13.0",
"@react-router/node": "^7.13.0",
"@react-router/serve": "^7.13.0",
"@tailwindcss/vite": "^4.1.17",
"@tanstack/react-query": "^5.90.21",
"class-variance-authority": "^0.7.1",
"compression": "^1.8.1",
"express": "^5.2.1",
"http-proxy-middleware": "^3.0.5",
"isbot": "^5.1.35",
"jsonwebtoken": "^9.0.3",
"lucide-react": "^0.563.0",
"morgan": "^1.10.1",
"pm2": "^6.0.14",
"react": "^19.2.4",
"react-dom": "^19.2.4",
"react-icons": "^5.5.0",
"react-router": "^7.13.0",
"tailwind-merge": "^3.4.0",
"tailwindcss-animate": "^1.0.7"
},
eslint.config.js:25
- This ESLint config sets
languageOptionstwice in the same config object. The secondlanguageOptions(line 21+) overrides the first one, so the configured parser and parserOptions are effectively dropped. Merge these into a singlelanguageOptionsblock so parser + parserOptions + globals are all applied together.
{
languageOptions: {
parser: '@typescript-eslint/parser',
parserOptions: {
ecmaVersion: 2020,
sourceType: 'module',
ecmaFeatures: { jsx: true },
},
globals: globals.browser,
},
extends: [js.configs.recommended, ...tseslint.configs.recommended],
files: ['**/*.{ts,tsx}'],
languageOptions: {
ecmaVersion: 2020,
globals: globals.browser,
},
plugins: {
src/pages/DataplayerLegacyRunPage.tsx:18
DataplayerLegacyRunPagestill imports from../types/dispatcher, butsrc/types/dispatcher.tswas removed. This will cause module resolution failures. Either restore the dispatcher types module or update this page (and related utils) to import the types from their new location.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const datasetTitle = searchParams.get('title'); | ||
| const datasetHandle = searchParams.get('datasetId'); | ||
| const navigate = useNavigate(); | ||
|
|
||
| // Step management | ||
| const [currentStep, setCurrentStep] = useState<StepType>('select-analysis'); | ||
| // tool uuid | ||
| const [selectedToolId, setSelectedToolId] = useState<string>(null); | ||
|
|
||
| // File management | ||
| const [files, setFiles] = useState<FileMeta[]>([]); | ||
| const [fileParameterMappings, setFileParameterMappings] = useState<Record<number, string>>({}); | ||
| const [loadingFiles, setLoadingFiles] = useState(false); | ||
| const [filesError, setFilesError] = useState<string | null>(null); | ||
|
|
||
| const [loading, setLoading] = useState(true); | ||
| const [error, setError] = useState<string | null>(null); | ||
|
|
||
| useEffect(() => { | ||
| const load = async () => { | ||
| console.log("Start loading"); | ||
| try { | ||
| setLoading(true); | ||
| const files = await fetchFilesMetaByDatasetHandle(datasetHandle); | ||
| setFiles(files); | ||
| } catch (err) { | ||
| console.error(err); | ||
| setError("Failed to fetch files"); | ||
| } finally { | ||
| setLoading(false); | ||
| console.log("Finished loading"); | ||
| } | ||
| }; | ||
|
|
||
| load(); | ||
| }, [datasetHandle]); |
There was a problem hiding this comment.
datasetHandle is read from the URL and can be null, but it’s passed directly into fetchFilesMetaByDatasetHandle(). This can result in requests like handle=null and confusing UI states. Add a guard that shows a clear error (or redirects) when datasetId is missing/invalid.
| <a href={hit._id} target="_blank" rel="noopener noreferrer" | ||
| aria-label={`View dataset ${hit.title}`} | ||
| className="inline-flex items-center justify-center gap-1 rounded-md bg-green-600 px-3 py-1.5 text-sm font-medium text-white shadow-sm hover:bg-green-700 focus-visible:outline-2 focus-visible:outline-offset-2 focus-visible:outline-green-600 transition-colors"> | ||
| aria-label={`Redinect to the source of dataset ${hit.title}`} |
There was a problem hiding this comment.
Typo in aria-label: “Redinect” → “Redirect”.
| aria-label={`Redinect to the source of dataset ${hit.title}`} | |
| aria-label={`Redirect to the source of dataset ${hit.title}`} |
| client.getArtifact(grpc_req, (err, response) => { | ||
| if (err) { | ||
| console.error(err); | ||
| return; | ||
| } | ||
| let callbackUrl: string | undefined; | ||
|
|
||
| if (response.eoscInline) { | ||
| callbackUrl = response.eoscInline.callbackUrl; | ||
| } else if (response.hosted) { | ||
| callbackUrl = response.hosted.callbackUrl; | ||
| } else { | ||
| console.error("No entry_point found"); | ||
| return; | ||
| } | ||
| v_callbackUrl = callbackUrl; | ||
| res.send(v_callbackUrl); | ||
| }); |
There was a problem hiding this comment.
In /api/coordinator/tasks-result/:taskId, if getArtifact returns an error or no entry point, the handler logs and returns without sending a response. This leaves the HTTP request hanging. Respond with an appropriate status (e.g., 502/500) in the error branch and when no entry point is found.
| process.on("SIGINT", () => { | ||
| console.log("SIGINT received: closing gRPC client..."); | ||
| // closes HTTP/2 channel to avoid resource leaking | ||
| db_client.close(); | ||
| process.exit(); | ||
| }); | ||
|
|
||
| process.on("SIGTERM", () => { | ||
| console.log("SIGTERM received: closing gRPC client..."); | ||
| db_client.close(); |
There was a problem hiding this comment.
db_client can be null when SIGINT/SIGTERM handlers run (e.g., no RPCs made yet), but the code calls db_client.close() unconditionally. This will throw during shutdown. Use a null-check/optional chaining and consider also closing the other singleton clients created in this module.
| process.on("SIGINT", () => { | |
| console.log("SIGINT received: closing gRPC client..."); | |
| // closes HTTP/2 channel to avoid resource leaking | |
| db_client.close(); | |
| process.exit(); | |
| }); | |
| process.on("SIGTERM", () => { | |
| console.log("SIGTERM received: closing gRPC client..."); | |
| db_client.close(); | |
| function closeGrpcClients(): void { | |
| // closes HTTP/2 channel to avoid resource leaking | |
| db_client?.close(); | |
| db_client = null; | |
| } | |
| process.on("SIGINT", () => { | |
| console.log("SIGINT received: closing gRPC client..."); | |
| closeGrpcClients(); | |
| process.exit(); | |
| }); | |
| process.on("SIGTERM", () => { | |
| console.log("SIGTERM received: closing gRPC client..."); | |
| closeGrpcClients(); |
| // XXX: if I deploy the grpc server with client in the same NAT, I can use insecure channel, but if goes to ethz deployment, should not. | ||
| // Should use SSL for msg over wire. | ||
| const client = new DataplayerServiceClient(GRPC_TARGET, createInsecureChannel()); | ||
|
|
||
| const msgSlotsMapping = {} as {string: FileEntry}; | ||
| for (const k in slotsMapping) { | ||
| msgSlotsMapping[k] = fileMetaToFileEntry(slotsMapping[k]); | ||
| } | ||
|
|
||
| const request: LaunchToolRequest = { | ||
| toolId, | ||
| slotsMapping: msgSlotsMapping, | ||
| } | ||
|
|
||
| return new Promise((resolve, reject) => { | ||
| client.launchTool(request, metadata, (error, response) => { | ||
| if (error) { | ||
| // TODO: log needed? | ||
| reject(error); | ||
| return; | ||
| } | ||
|
|
||
| resolve(response.handlerId); | ||
| }); | ||
| }); | ||
| } |
There was a problem hiding this comment.
msgSlotsMapping is typed as {} as {string: FileEntry}, which isn’t a valid index signature type and defeats type-checking. Use a proper map type (e.g. Record<string, FileEntry>) and consider closing/reusing the gRPC client after launchTool completes to avoid leaking channels.
| // XXX: should I close the client?? | ||
| client.close(); | ||
| resolve(files); | ||
| }); | ||
|
|
||
| stream.on("error", (err: grpc.ServiceError) => { | ||
| client.close(); |
There was a problem hiding this comment.
fetchDatasetFilesFromDatadaseByUUID (typo in name) calls client.close() on end/error, but client comes from getDatasetClient() which returns a singleton. Closing it here can break subsequent calls that reuse the singleton. Either don’t close singleton clients per call, or stop using a singleton for this RPC.
| // XXX: should I close the client?? | |
| client.close(); | |
| resolve(files); | |
| }); | |
| stream.on("error", (err: grpc.ServiceError) => { | |
| client.close(); | |
| // Do not close the dataset client here; getDatasetClient() returns | |
| // a shared singleton that must remain available for subsequent RPCs. | |
| resolve(files); | |
| }); | |
| stream.on("error", (err: grpc.ServiceError) => { |
| import { | ||
| DispatcherResult, | ||
| FileMetrixFile, | ||
| TaskStatus, | ||
| TaskStatusResponse, | ||
| } from "../types/dispatcher"; | ||
| import { DISPATCHER_CONFIGS, updateOnedataForTarget } from "./dispatcherUtils"; | ||
| import { fetchWithTimeout } from "./utils"; | ||
|
|
There was a problem hiding this comment.
This file imports dispatcher types from ../types/dispatcher, but src/types/dispatcher.ts was removed in this PR. That will break builds for the legacy dispatcher path. Either keep the dispatcher type definitions (e.g., reintroduce src/types/dispatcher.ts) or move these legacy types into a new module and update imports accordingly.
| import { | |
| DispatcherResult, | |
| FileMetrixFile, | |
| TaskStatus, | |
| TaskStatusResponse, | |
| } from "../types/dispatcher"; | |
| import { DISPATCHER_CONFIGS, updateOnedataForTarget } from "./dispatcherUtils"; | |
| import { fetchWithTimeout } from "./utils"; | |
| import { DISPATCHER_CONFIGS, updateOnedataForTarget } from "./dispatcherUtils"; | |
| import { fetchWithTimeout } from "./utils"; | |
| type TaskStatus = string; | |
| interface DispatcherResult { | |
| task_id: string; | |
| [key: string]: unknown; | |
| } | |
| interface FileMetrixFile { | |
| ro_crate_extensions: unknown; | |
| [key: string]: unknown; | |
| } | |
| interface TaskStatusResponse { | |
| status: TaskStatus; | |
| task_id?: string; | |
| [key: string]: unknown; | |
| } |
| const tools = await searchToolsByText(toolSearchText); | ||
| setQueryToolResults(tools); | ||
| } | ||
|
|
||
| load(); | ||
| }, [debouncedSearch, toolSearchText]); |
There was a problem hiding this comment.
The debounced search effect triggers on debouncedSearch, but the API call uses toolSearchText instead. This defeats the debounce and can issue requests with a newer/older value than the effect intended. Use debouncedSearch as the query inside the effect and remove the extra dependency on toolSearchText.
| const tools = await searchToolsByText(toolSearchText); | |
| setQueryToolResults(tools); | |
| } | |
| load(); | |
| }, [debouncedSearch, toolSearchText]); | |
| const tools = await searchToolsByText(debouncedSearch); | |
| setQueryToolResults(tools); | |
| } | |
| load(); | |
| }, [debouncedSearch]); |
| </td> | ||
| <td className="px-6 py-4"> | ||
| <select | ||
| onChange={(e) => handleSlotSet(Number(e.target.value), param)} | ||
| className="block w-full px-3 py-2 text-sm border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-blue-500 focus:border-blue-500" | ||
| > | ||
| <option key="none" value="None">--select to set parameter--</option> | ||
| { | ||
| files.map((file, fileIndex) => ( | ||
| <option key={fileIndex} value={fileIndex}> | ||
| {file.filename} | ||
| </option> | ||
| )) | ||
| } | ||
| </select> |
There was a problem hiding this comment.
In the slot→file mapping table, the “none” option has value="None" but onChange does Number(e.target.value), so selecting it will pass NaN as the file index and corrupt mappings. Also, because the slot name is fixed (param), there’s currently no way to unassign a slot once set. Use a numeric sentinel for “none” (or a separate handler) and update handleSlotSet/mapping model so slots can be cleared safely.
| app.get("/api/coordinator/task-status/:taskId", async (req, res) => { | ||
| // TODO: token or session cookie to prevent access from anywhere. | ||
|
|
||
| const { taskId } = req.params; | ||
|
|
||
| // SSE headers | ||
| res.setHeader("Content-Type", "text/event-stream"); | ||
| res.setHeader("Cache-Control", "no-cache"); | ||
| res.setHeader("Connection", "keep-alive"); | ||
|
|
||
| // XXX: per user connection as well, so the token or the user name need to be an argument passed in | ||
| const client = getDataplayerClient(); | ||
| const grpc_req: MonitorStateRequest = { | ||
| id: taskId, | ||
| }; | ||
| const stream = client.monitorState(grpc_req); | ||
| let lastState: ToolState_State | null = null; | ||
|
|
||
| stream.on("data", (resp: MonitorStateResponse) => { | ||
| // when state machine transit to the end state. | ||
| const toolState = resp.status; | ||
| const currentState = toolState?.state ?? null; | ||
|
|
||
| // Stream progress | ||
| if (currentState && currentState != lastState) { | ||
| lastState = currentState; | ||
|
|
||
| let stateStr = null; | ||
| // XXX: ??? why currentState don't match? | ||
| // if (currentState === ToolState_State.PREPARING) { | ||
| // stateStr = "PREPARING"; | ||
| // } | ||
| if (currentState === ToolState_State.READY) { | ||
| stateStr = "READY"; | ||
| } | ||
| if (currentState === ToolState_State.DROPPED) { | ||
| stateStr = "DROPPED"; | ||
| } | ||
| if (currentState === ToolState_State.UNRECOGNIZED) { | ||
| stateStr = "UNRECOGNIZED"; | ||
| } | ||
| res.write(`event: state\ndata: ${ | ||
| JSON.stringify({ | ||
| state: stateStr, | ||
| message: toolState?.log, | ||
| }) | ||
| }\n\n`); | ||
| } | ||
| }); | ||
|
|
||
| stream.on("end", () => { | ||
| res.end(); | ||
| }); | ||
| }); |
There was a problem hiding this comment.
The SSE endpoint for task status doesn’t handle client disconnects or gRPC stream errors. Without req.on('close', ...) to cancel/close the gRPC stream (and an stream.on('error', ...) handler), the server can leak listeners/streams if the browser navigates away or the RPC fails.
All mocking moved to coordinator. Tool service and dispatcher service works together there.
…coordinator # Conflicts: # src/lib/dispatcherApi.ts
No description provided.