diff --git a/apps/koa-esm/src/index.js b/apps/koa-esm/src/index.js index 218b159..b001bc1 100644 --- a/apps/koa-esm/src/index.js +++ b/apps/koa-esm/src/index.js @@ -77,6 +77,46 @@ router.get('/fetch', async (ctx) => { ctx.body = data }) +// SSE test endpoint - server side +router.get('/sse-server', async (ctx) => { + ctx.set('Content-Type', 'text/event-stream') + ctx.set('Cache-Control', 'no-cache') + ctx.set('Connection', 'keep-alive') + + ctx.status = 200 + ctx.respond = false + + const res = ctx.res + let count = 0 + const interval = setInterval(() => { + count++ + res.write(`event: message\nid: ${count}\ndata: {"count": ${count}, "time": "${new Date().toISOString()}"}\n\n`) + if (count >= 5) { + clearInterval(interval) + res.end() + } + }, 500) +}) + +// SSE test endpoint - fetch SSE from external source +router.get('/sse-fetch', async (ctx) => { + // Fetch SSE from our own server + const response = await fetch('http://localhost:3001/sse-server') + + // Consume the stream to capture events + const reader = response.body.getReader() + const decoder = new TextDecoder() + let result = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + result += decoder.decode(value, { stream: true }) + } + + ctx.body = { message: 'SSE stream consumed', data: result } +}) + app.use(router.routes()) app.listen(3001) diff --git a/packages/network-debugger/src/core/fetch.test.ts b/packages/network-debugger/src/core/fetch.test.ts index 9e3c795..5b0e936 100644 --- a/packages/network-debugger/src/core/fetch.test.ts +++ b/packages/network-debugger/src/core/fetch.test.ts @@ -535,5 +535,235 @@ describe('core/fetch.ts', () => { expect(mockFetch).toHaveBeenCalledWith('https://example.com/api', options) }) }) + + describe('SSE (Server-Sent Events) 处理', () => { + // Helper to create a mock SSE response + function createMockSSEResponse(events: string[], options: { delay?: number } = {}) { + const { delay = 0 } = options + const mockHeaders = new Headers({ + 'content-type': 'text/event-stream' + }) + + // Create a mock ReadableStream + let readerIndex = 0 + const encoder = new TextEncoder() + const chunks = events.map((event) => encoder.encode(event)) + + const mockReader = { + read: vi.fn().mockImplementation(async () => { + if (delay > 0) { + await new Promise((resolve) => setTimeout(resolve, delay)) + } + if (readerIndex >= chunks.length) { + return { done: true, value: undefined } + } + const value = chunks[readerIndex] + readerIndex++ + return { done: false, value } + }) + } + + const mockBody = { + getReader: vi.fn().mockReturnValue(mockReader) + } + + return { + status: 200, + headers: mockHeaders, + body: mockBody, + clone: vi.fn().mockReturnValue({ + body: mockBody, + arrayBuffer: vi.fn().mockResolvedValue(new ArrayBuffer(0)) + }) + } as unknown as Response + } + + test('正确识别 SSE 响应 (text/event-stream)', async () => { + const mockResponse = createMockSSEResponse(['data: hello\n\n']) + const mockFetch = vi.fn().mockResolvedValue(mockResponse) + const { mockMainProcess } = createMockMainProcess() + + const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never) + const result = await proxyFn('https://example.com/sse') + + // SSE 响应应该直接返回,不等待流结束 + expect(result).toBe(mockResponse) + }) + + test('解析简单的 SSE 事件', async () => { + const mockResponse = createMockSSEResponse(['data: hello world\n\n']) + const mockFetch = vi.fn().mockResolvedValue(mockResponse) + const mockSend = vi.fn() + const mockSendRequest = vi.fn().mockReturnThis() + const mockMainProcess = { + sendRequest: mockSendRequest, + send: mockSend + } + + const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never) + await proxyFn('https://example.com/sse') + + // 等待流处理完成 + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(mockSend).toHaveBeenCalledWith({ + type: 'eventSourceMessage', + data: { + requestId: expect.any(String), + eventName: 'message', + eventId: '', + data: 'hello world' + } + }) + }) + + test('解析带有 event 类型的 SSE 事件', async () => { + const mockResponse = createMockSSEResponse(['event: custom\ndata: test data\n\n']) + const mockFetch = vi.fn().mockResolvedValue(mockResponse) + const mockSend = vi.fn() + const mockSendRequest = vi.fn().mockReturnThis() + const mockMainProcess = { + sendRequest: mockSendRequest, + send: mockSend + } + + const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never) + await proxyFn('https://example.com/sse') + + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(mockSend).toHaveBeenCalledWith({ + type: 'eventSourceMessage', + data: { + requestId: expect.any(String), + eventName: 'custom', + eventId: '', + data: 'test data' + } + }) + }) + + test('解析带有 id 的 SSE 事件', async () => { + const mockResponse = createMockSSEResponse(['id: 123\ndata: with id\n\n']) + const mockFetch = vi.fn().mockResolvedValue(mockResponse) + const mockSend = vi.fn() + const mockSendRequest = vi.fn().mockReturnThis() + const mockMainProcess = { + sendRequest: mockSendRequest, + send: mockSend + } + + const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never) + await proxyFn('https://example.com/sse') + + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(mockSend).toHaveBeenCalledWith({ + type: 'eventSourceMessage', + data: { + requestId: expect.any(String), + eventName: 'message', + eventId: '123', + data: 'with id' + } + }) + }) + + test('处理多行 data', async () => { + const mockResponse = createMockSSEResponse(['data: line1\ndata: line2\n\n']) + const mockFetch = vi.fn().mockResolvedValue(mockResponse) + const mockSend = vi.fn() + const mockSendRequest = vi.fn().mockReturnThis() + const mockMainProcess = { + sendRequest: mockSendRequest, + send: mockSend + } + + const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never) + await proxyFn('https://example.com/sse') + + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(mockSend).toHaveBeenCalledWith({ + type: 'eventSourceMessage', + data: { + requestId: expect.any(String), + eventName: 'message', + eventId: '', + data: 'line1\nline2' + } + }) + }) + + test('处理多个连续的 SSE 事件', async () => { + const mockResponse = createMockSSEResponse([ + 'data: first\n\n', + 'data: second\n\n', + 'data: third\n\n' + ]) + const mockFetch = vi.fn().mockResolvedValue(mockResponse) + const mockSend = vi.fn() + const mockSendRequest = vi.fn().mockReturnThis() + const mockMainProcess = { + sendRequest: mockSendRequest, + send: mockSend + } + + const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never) + await proxyFn('https://example.com/sse') + + await new Promise((resolve) => setTimeout(resolve, 100)) + + const eventSourceCalls = mockSend.mock.calls.filter( + (call) => call[0]?.type === 'eventSourceMessage' + ) + expect(eventSourceCalls.length).toBe(3) + expect(eventSourceCalls[0][0].data.data).toBe('first') + expect(eventSourceCalls[1][0].data.data).toBe('second') + expect(eventSourceCalls[2][0].data.data).toBe('third') + }) + + test('SSE 流结束后发送 endRequest', async () => { + const mockResponse = createMockSSEResponse(['data: test\n\n']) + const mockFetch = vi.fn().mockResolvedValue(mockResponse) + const mockSend = vi.fn() + const mockSendRequest = vi.fn().mockReturnThis() + const mockMainProcess = { + sendRequest: mockSendRequest, + send: mockSend + } + + const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never) + await proxyFn('https://example.com/sse') + + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(mockSendRequest).toHaveBeenCalledWith('endRequest', expect.any(RequestDetail)) + }) + + test('非 SSE 响应不触发 eventSourceMessage', async () => { + const mockResponse = createMockResponse({ + headers: { 'content-type': 'application/json' }, + body: '{"key": "value"}' + }) + const mockFetch = vi.fn().mockResolvedValue(mockResponse) + const mockSend = vi.fn() + const mockSendRequest = vi.fn().mockReturnThis() + const mockMainProcess = { + sendRequest: mockSendRequest, + send: mockSend + } + + const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never) + await proxyFn('https://example.com/api') + + await new Promise((resolve) => setTimeout(resolve, 50)) + + const eventSourceCalls = mockSend.mock.calls.filter( + (call) => call[0]?.type === 'eventSourceMessage' + ) + expect(eventSourceCalls.length).toBe(0) + }) + }) }) }) diff --git a/packages/network-debugger/src/core/fetch.ts b/packages/network-debugger/src/core/fetch.ts index b4311a0..c930212 100644 --- a/packages/network-debugger/src/core/fetch.ts +++ b/packages/network-debugger/src/core/fetch.ts @@ -54,12 +54,160 @@ export function fetchProxyFactory(fetchFn: typeof fetch, mainProcess: MainProces } } +/** + * Check if the response is a Server-Sent Events (SSE) stream + */ +function isEventStream(response: Response): boolean { + const contentType = response.headers.get('content-type') || '' + return contentType.includes('text/event-stream') +} + +/** + * Handle SSE (Server-Sent Events) streaming response + */ +async function handleEventStreamResponse( + response: Response, + requestDetail: RequestDetail, + mainProcess: MainProcess +): Promise { + const body = response.clone().body + if (!body) { + return + } + + const reader = body.getReader() + const decoder = new TextDecoder() + let buffer = '' + const allChunks: Uint8Array[] = [] + + // Send initial response info + mainProcess.sendRequest('updateRequest', requestDetail) + + try { + while (true) { + const { done, value } = await reader.read() + + if (done) { + break + } + + if (value) { + allChunks.push(value) + buffer += decoder.decode(value, { stream: true }) + + // Parse SSE events from buffer + const lines = buffer.split('\n') + buffer = lines.pop() || '' // Keep incomplete line in buffer + + let currentEventType = 'message' + let currentEventData = '' + let currentEventId = '' + + for (const line of lines) { + if (line.startsWith('event:')) { + currentEventType = line.slice(6).trim() + } else if (line.startsWith('data:')) { + currentEventData += (currentEventData ? '\n' : '') + line.slice(5).trim() + } else if (line.startsWith('id:')) { + currentEventId = line.slice(3).trim() + } else if (line === '') { + // Empty line means end of event + if (currentEventData) { + mainProcess.send({ + type: 'eventSourceMessage', + data: { + requestId: requestDetail.id, + eventName: currentEventType, + eventId: currentEventId, + data: currentEventData + } + }) + } + // Reset for next event + currentEventType = 'message' + currentEventData = '' + currentEventId = '' + } + } + } + } + + // Handle any remaining data in buffer + if (buffer.trim()) { + const lines = buffer.split('\n') + let currentEventType = 'message' + let currentEventData = '' + let currentEventId = '' + + for (const line of lines) { + if (line.startsWith('event:')) { + currentEventType = line.slice(6).trim() + } else if (line.startsWith('data:')) { + currentEventData += (currentEventData ? '\n' : '') + line.slice(5).trim() + } else if (line.startsWith('id:')) { + currentEventId = line.slice(3).trim() + } + } + + if (currentEventData) { + mainProcess.send({ + type: 'eventSourceMessage', + data: { + requestId: requestDetail.id, + eventName: currentEventType, + eventId: currentEventId, + data: currentEventData + } + }) + } + } + + // Combine all chunks for final response data + const totalLength = allChunks.reduce((acc, chunk) => acc + chunk.length, 0) + const combinedArray = new Uint8Array(totalLength) + let offset = 0 + for (const chunk of allChunks) { + combinedArray.set(chunk, offset) + offset += chunk.length + } + + requestDetail.responseData = Buffer.from(combinedArray) + requestDetail.responseInfo.dataLength = totalLength + requestDetail.responseInfo.encodedDataLength = totalLength + } catch (error) { + // Stream was aborted or errored, still try to save what we have + if (allChunks.length > 0) { + const totalLength = allChunks.reduce((acc, chunk) => acc + chunk.length, 0) + const combinedArray = new Uint8Array(totalLength) + let offset = 0 + for (const chunk of allChunks) { + combinedArray.set(chunk, offset) + offset += chunk.length + } + requestDetail.responseData = Buffer.from(combinedArray) + requestDetail.responseInfo.dataLength = totalLength + requestDetail.responseInfo.encodedDataLength = totalLength + } + } finally { + requestDetail.requestEndTime = Date.now() + mainProcess.sendRequest('updateRequest', requestDetail).sendRequest('endRequest', requestDetail) + } +} + function fetchResponseHandlerFactory(requestDetail: RequestDetail, mainProcess: MainProcess) { return (response: Response) => { requestDetail.requestEndTime = new Date().getTime() requestDetail.responseHeaders = headersToObject(response.headers) requestDetail.responseStatusCode = response.status || 0 + // Check if this is an SSE stream + if (isEventStream(response)) { + // Handle SSE asynchronously without blocking the response + handleEventStreamResponse(response, requestDetail, mainProcess) + return response + } + + // Handle regular response response .clone() .arrayBuffer() diff --git a/packages/network-debugger/src/fork/module/network/index.test.ts b/packages/network-debugger/src/fork/module/network/index.test.ts index 2a69e23..8ca9a61 100644 --- a/packages/network-debugger/src/fork/module/network/index.test.ts +++ b/packages/network-debugger/src/fork/module/network/index.test.ts @@ -1000,4 +1000,168 @@ describe('fork/module/network/index.ts', () => { expect(mockDevtool.send).toHaveBeenCalled() }) }) + + describe('Server-Sent Events (SSE) 支持', () => { + test('eventSourceMessage 处理器发送 CDP 消息', async () => { + const { networkPlugin } = await import('./index') + + const mockDevtool = createMockDevtool() + const mockCore = createMockCore() + registeredHandlers.clear() + + networkPlugin({ + devtool: mockDevtool, + core: mockCore, + plugins: [] + }) + + // 先注册一个请求 + const testRequest = createTestRequest({ + id: 'sse-request-id', + url: 'http://example.com/sse' + }) + + const initRequestHandlers = registeredHandlers.get('initRequest') + initRequestHandlers![0]({ data: testRequest, id: undefined }) + + // 发送 eventSourceMessage + const eventSourceHandlers = registeredHandlers.get('eventSourceMessage') + expect(eventSourceHandlers).toBeDefined() + + eventSourceHandlers![0]({ + data: { + requestId: 'sse-request-id', + eventName: 'message', + eventId: '1', + data: 'Hello SSE' + }, + id: undefined + }) + + expect(mockDevtool.send).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'Network.eventSourceMessageReceived', + params: expect.objectContaining({ + requestId: 'sse-request-id', + eventName: 'message', + eventId: '1', + data: 'Hello SSE' + }) + }) + ) + }) + + test('eventSourceMessage 处理自定义事件类型', async () => { + const { networkPlugin } = await import('./index') + + const mockDevtool = createMockDevtool() + const mockCore = createMockCore() + registeredHandlers.clear() + + networkPlugin({ + devtool: mockDevtool, + core: mockCore, + plugins: [] + }) + + // 先注册一个请求 + const testRequest = createTestRequest({ + id: 'sse-custom-event-id', + url: 'http://example.com/sse' + }) + + const initRequestHandlers = registeredHandlers.get('initRequest') + initRequestHandlers![0]({ data: testRequest, id: undefined }) + + // 发送自定义事件类型的 eventSourceMessage + const eventSourceHandlers = registeredHandlers.get('eventSourceMessage') + + eventSourceHandlers![0]({ + data: { + requestId: 'sse-custom-event-id', + eventName: 'customEvent', + eventId: '42', + data: '{"status": "complete"}' + }, + id: undefined + }) + + expect(mockDevtool.send).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'Network.eventSourceMessageReceived', + params: expect.objectContaining({ + requestId: 'sse-custom-event-id', + eventName: 'customEvent', + eventId: '42', + data: '{"status": "complete"}' + }) + }) + ) + }) + + test('eventSourceMessage 对未知请求 ID 不发送消息', async () => { + const { networkPlugin } = await import('./index') + + const mockDevtool = createMockDevtool() + const mockCore = createMockCore() + registeredHandlers.clear() + + networkPlugin({ + devtool: mockDevtool, + core: mockCore, + plugins: [] + }) + + // 不注册请求,直接发送 eventSourceMessage + const eventSourceHandlers = registeredHandlers.get('eventSourceMessage') + + eventSourceHandlers![0]({ + data: { + requestId: 'unknown-request-id', + eventName: 'message', + eventId: '', + data: 'test' + }, + id: undefined + }) + + // 不应该发送 eventSourceMessageReceived + const eventSourceCalls = mockDevtool.send.mock.calls.filter( + (call: unknown[]) => + (call[0] as { method?: string })?.method === 'Network.eventSourceMessageReceived' + ) + expect(eventSourceCalls.length).toBe(0) + }) + + test('text/event-stream 类型被识别为 EventSource', async () => { + const { networkPlugin } = await import('./index') + + const mockDevtool = createMockDevtool() + const mockCore = createMockCore() + registeredHandlers.clear() + + networkPlugin({ + devtool: mockDevtool, + core: mockCore, + plugins: [] + }) + + const testRequest = createTestRequest({ + id: 'sse-type-test-id', + responseHeaders: { 'content-type': 'text/event-stream' } + }) + + const endRequestHandlers = registeredHandlers.get('endRequest') + endRequestHandlers![0]({ data: testRequest, id: undefined }) + + expect(mockDevtool.send).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'Network.responseReceived', + params: expect.objectContaining({ + type: 'EventSource' + }) + }) + ) + }) + }) }) diff --git a/packages/network-debugger/src/fork/module/network/index.ts b/packages/network-debugger/src/fork/module/network/index.ts index 4811469..73f229a 100644 --- a/packages/network-debugger/src/fork/module/network/index.ts +++ b/packages/network-debugger/src/fork/module/network/index.ts @@ -30,6 +30,9 @@ export const networkPlugin = createPlugin('network', ({ devtool, core }) => { const contentType = headers.getHeader('content-type') || 'text/plain; charset=utf-8' const type = (() => { + if (/text\/event-stream/.test(contentType)) { + return 'EventSource' + } if (/image/.test(contentType)) { return 'Image' } @@ -222,6 +225,32 @@ export const networkPlugin = createPlugin('network', ({ devtool, core }) => { } }) + // Handle Server-Sent Events (SSE) messages + useHandler<{ + requestId: string + eventName: string + eventId: string + data: string + }>('eventSourceMessage', ({ data }) => { + const { requestId, eventName, eventId, data: eventData } = data + const request = getRequest(requestId) + if (!request) { + return + } + + devtool.updateTimestamp() + devtool.send({ + method: 'Network.eventSourceMessageReceived', + params: { + requestId, + timestamp: devtool.timestamp, + eventName, + eventId, + data: eventData + } + }) + }) + return { getRequest, resourceService