Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions apps/koa-esm/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,46 @@ router.get('/fetch', async (ctx) => {
ctx.body = data
})

// SSE test endpoint - server side
router.get('/sse-server', async (ctx) => {
ctx.set('Content-Type', 'text/event-stream')
ctx.set('Cache-Control', 'no-cache')
ctx.set('Connection', 'keep-alive')

ctx.status = 200
ctx.respond = false

const res = ctx.res
let count = 0
const interval = setInterval(() => {
count++
res.write(`event: message\nid: ${count}\ndata: {"count": ${count}, "time": "${new Date().toISOString()}"}\n\n`)
if (count >= 5) {
clearInterval(interval)
res.end()
}
}, 500)
})

// SSE test endpoint - fetch SSE from external source
router.get('/sse-fetch', async (ctx) => {
// Fetch SSE from our own server
const response = await fetch('http://localhost:3001/sse-server')

// Consume the stream to capture events
const reader = response.body.getReader()
const decoder = new TextDecoder()
let result = ''

while (true) {
const { done, value } = await reader.read()
if (done) break
result += decoder.decode(value, { stream: true })
}

ctx.body = { message: 'SSE stream consumed', data: result }
})

app.use(router.routes())
app.listen(3001)

Expand Down
230 changes: 230 additions & 0 deletions packages/network-debugger/src/core/fetch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -535,5 +535,235 @@ describe('core/fetch.ts', () => {
expect(mockFetch).toHaveBeenCalledWith('https://example.com/api', options)
})
})

describe('SSE (Server-Sent Events) 处理', () => {
// Helper to create a mock SSE response
function createMockSSEResponse(events: string[], options: { delay?: number } = {}) {
const { delay = 0 } = options
const mockHeaders = new Headers({
'content-type': 'text/event-stream'
})

// Create a mock ReadableStream
let readerIndex = 0
const encoder = new TextEncoder()
const chunks = events.map((event) => encoder.encode(event))

const mockReader = {
read: vi.fn().mockImplementation(async () => {
if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay))
}
if (readerIndex >= chunks.length) {
return { done: true, value: undefined }
}
const value = chunks[readerIndex]
readerIndex++
return { done: false, value }
})
}

const mockBody = {
getReader: vi.fn().mockReturnValue(mockReader)
}

return {
status: 200,
headers: mockHeaders,
body: mockBody,
clone: vi.fn().mockReturnValue({
body: mockBody,
arrayBuffer: vi.fn().mockResolvedValue(new ArrayBuffer(0))
})
} as unknown as Response
}

test('正确识别 SSE 响应 (text/event-stream)', async () => {
const mockResponse = createMockSSEResponse(['data: hello\n\n'])
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
const { mockMainProcess } = createMockMainProcess()

const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
const result = await proxyFn('https://example.com/sse')

// SSE 响应应该直接返回,不等待流结束
expect(result).toBe(mockResponse)
})

test('解析简单的 SSE 事件', async () => {
const mockResponse = createMockSSEResponse(['data: hello world\n\n'])
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
const mockSend = vi.fn()
const mockSendRequest = vi.fn().mockReturnThis()
const mockMainProcess = {
sendRequest: mockSendRequest,
send: mockSend
}

const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
await proxyFn('https://example.com/sse')

// 等待流处理完成
await new Promise((resolve) => setTimeout(resolve, 50))

expect(mockSend).toHaveBeenCalledWith({
type: 'eventSourceMessage',
data: {
requestId: expect.any(String),
eventName: 'message',
eventId: '',
data: 'hello world'
}
})
})

test('解析带有 event 类型的 SSE 事件', async () => {
const mockResponse = createMockSSEResponse(['event: custom\ndata: test data\n\n'])
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
const mockSend = vi.fn()
const mockSendRequest = vi.fn().mockReturnThis()
const mockMainProcess = {
sendRequest: mockSendRequest,
send: mockSend
}

const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
await proxyFn('https://example.com/sse')

await new Promise((resolve) => setTimeout(resolve, 50))

expect(mockSend).toHaveBeenCalledWith({
type: 'eventSourceMessage',
data: {
requestId: expect.any(String),
eventName: 'custom',
eventId: '',
data: 'test data'
}
})
})

test('解析带有 id 的 SSE 事件', async () => {
const mockResponse = createMockSSEResponse(['id: 123\ndata: with id\n\n'])
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
const mockSend = vi.fn()
const mockSendRequest = vi.fn().mockReturnThis()
const mockMainProcess = {
sendRequest: mockSendRequest,
send: mockSend
}

const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
await proxyFn('https://example.com/sse')

await new Promise((resolve) => setTimeout(resolve, 50))

expect(mockSend).toHaveBeenCalledWith({
type: 'eventSourceMessage',
data: {
requestId: expect.any(String),
eventName: 'message',
eventId: '123',
data: 'with id'
}
})
})

test('处理多行 data', async () => {
const mockResponse = createMockSSEResponse(['data: line1\ndata: line2\n\n'])
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
const mockSend = vi.fn()
const mockSendRequest = vi.fn().mockReturnThis()
const mockMainProcess = {
sendRequest: mockSendRequest,
send: mockSend
}

const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
await proxyFn('https://example.com/sse')

await new Promise((resolve) => setTimeout(resolve, 50))

expect(mockSend).toHaveBeenCalledWith({
type: 'eventSourceMessage',
data: {
requestId: expect.any(String),
eventName: 'message',
eventId: '',
data: 'line1\nline2'
}
})
})

test('处理多个连续的 SSE 事件', async () => {
const mockResponse = createMockSSEResponse([
'data: first\n\n',
'data: second\n\n',
'data: third\n\n'
])
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
const mockSend = vi.fn()
const mockSendRequest = vi.fn().mockReturnThis()
const mockMainProcess = {
sendRequest: mockSendRequest,
send: mockSend
}

const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
await proxyFn('https://example.com/sse')

await new Promise((resolve) => setTimeout(resolve, 100))

const eventSourceCalls = mockSend.mock.calls.filter(
(call) => call[0]?.type === 'eventSourceMessage'
)
expect(eventSourceCalls.length).toBe(3)
expect(eventSourceCalls[0][0].data.data).toBe('first')
expect(eventSourceCalls[1][0].data.data).toBe('second')
expect(eventSourceCalls[2][0].data.data).toBe('third')
})

test('SSE 流结束后发送 endRequest', async () => {
const mockResponse = createMockSSEResponse(['data: test\n\n'])
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
const mockSend = vi.fn()
const mockSendRequest = vi.fn().mockReturnThis()
const mockMainProcess = {
sendRequest: mockSendRequest,
send: mockSend
}

const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
await proxyFn('https://example.com/sse')

await new Promise((resolve) => setTimeout(resolve, 50))

expect(mockSendRequest).toHaveBeenCalledWith('endRequest', expect.any(RequestDetail))
})

test('非 SSE 响应不触发 eventSourceMessage', async () => {
const mockResponse = createMockResponse({
headers: { 'content-type': 'application/json' },
body: '{"key": "value"}'
})
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
const mockSend = vi.fn()
const mockSendRequest = vi.fn().mockReturnThis()
const mockMainProcess = {
sendRequest: mockSendRequest,
send: mockSend
}

const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
await proxyFn('https://example.com/api')

await new Promise((resolve) => setTimeout(resolve, 50))

const eventSourceCalls = mockSend.mock.calls.filter(
(call) => call[0]?.type === 'eventSourceMessage'
)
expect(eventSourceCalls.length).toBe(0)
})
})
})
})
Loading