diff --git a/example/grpc_client.js b/example/grpc_client.js index 7cb230a..59b60b0 100644 --- a/example/grpc_client.js +++ b/example/grpc_client.js @@ -18,7 +18,10 @@ async function test() { serverHost: 'http://localhost:12200', }); await consumer.ready(); - const r = await consumer.invoke('SayHello', [{ name: 'peter' }]); + const r = await Promise.all([ + consumer.invoke('SayHello', [{ name: 'peter' }]), + consumer.invoke('SayHi', [{ name: 'tony' }]), + ]); console.log(r); } diff --git a/example/grpc_server.js b/example/grpc_server.js index e360933..c62d8e0 100644 --- a/example/grpc_server.js +++ b/example/grpc_server.js @@ -22,6 +22,12 @@ server.addService({ message: `hello ${req.name} from sofa-rpc-node`, }; }, + async SayHi(req) { + console.log(req); + return { + message: `hi ${req.name} from sofa-rpc-node`, + }; + }, }); server.start(); diff --git a/example/proto/helloworld.proto b/example/proto/helloworld.proto index be878ce..a413c9d 100644 --- a/example/proto/helloworld.proto +++ b/example/proto/helloworld.proto @@ -25,6 +25,7 @@ package helloworld; service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} + rpc SayHi (HelloRequest) returns (HelloReply) {} } // The request message containing the user's name. diff --git a/lib/client/connection/grpc/call_stream.js b/lib/client/connection/grpc/call_stream.js index eed666d..3adbe44 100644 --- a/lib/client/connection/grpc/call_stream.js +++ b/lib/client/connection/grpc/call_stream.js @@ -206,6 +206,7 @@ class Http2CallStream extends Duplex { } this._endCall(err); this.destroy(); + this.emit('close'); } _attachHttp2Stream(stream) { diff --git a/lib/server/grpc/response.js b/lib/server/grpc/response.js index d579b01..947673b 100644 --- a/lib/server/grpc/response.js +++ b/lib/server/grpc/response.js @@ -29,11 +29,37 @@ class GRpcResponse { async send(res) { const methodInfo = this.methodInfo; - if (methodInfo.responseType) { + let http2Header = { + 'content-type': 'application/grpc', + 'grpc-accept-encoding': 'identity', + 'accept-encoding': 'identity', + }; + let grpcMeta = { + 'grpc-status': 0, + 'grpc-message': 'OK', + }; + let data = ''; + if (res.isError) { + this.meta.rt = Date.now() - this.meta.start; + this.meta.resultCode = '02'; + http2Header = { + ...http2Header, + ':status': 500, // make a unknown http2 status + }; + grpcMeta = { + ...grpcMeta, + 'grpc-status': 2, // UNKNOWN + 'grpc-message': res.errorMsg, + }; + data = ''; + } else if (methodInfo.responseType) { const responseEncodeStart = Date.now(); const responseType = methodInfo.resolvedResponseType; - const buf = responseType.encode(responseType.fromObject(res.appResponse)).finish(); + const buf = responseType + .encode(responseType.fromObject(res.appResponse)) + .finish(); const resSize = buf.length; + io.reset(); io.put(0); io.putInt(resSize); io.put(buf); @@ -42,21 +68,22 @@ class GRpcResponse { this.meta.data = buf; this.meta.rt = Date.now() - this.meta.start; this.meta.resSize = resSize + 5; - - this.stream.respond({ - ':status': 200, - 'content-type': 'application/grpc', - 'grpc-accept-encoding': 'identity', - 'accept-encoding': 'identity', - }, { waitForTrailers: true }); - this.stream.on('wantTrailers', () => { - this.stream.sendTrailers({ - 'grpc-status': 0, - 'grpc-message': 'OK', - }); - }); - this.stream.end(io.array()); + http2Header = { ...http2Header, ':status': 200 }; + grpcMeta = { + ...grpcMeta, + 'grpc-status': 0, + 'grpc-message': 'OK', + }; + data = io.array(); + } + if (!this.stream || this.stream.destroyed) { + return; } + this.stream.respond(http2Header, { waitForTrailers: true }); + this.stream.on('wantTrailers', () => { + this.stream.sendTrailers(grpcMeta); + }); + this.stream.end(data); } } diff --git a/lib/server/server.js b/lib/server/server.js index 71018ed..fc85325 100644 --- a/lib/server/server.js +++ b/lib/server/server.js @@ -271,15 +271,17 @@ class RpcServer extends Base { this.emit('request', { req, ctx }); try { if (!service) { - res.meta.resultCode = '02'; - await res.send({ - isError: true, - errorMsg: 'not found service: ' + id, - appResponse: null, - }); - } else { - await service.invoke(ctx, req, res); + throw new Error('not found service: ' + id); } + await service.invoke(ctx, req, res); + } catch (e) { + this.emit('error', e); + res.meta.resultCode = '02'; + await res.send({ + isError: true, + errorMsg: e.message, + appResponse: e, + }); } finally { this.emit('response', { ctx, req, res }); } diff --git a/lib/server/service.js b/lib/server/service.js index 88e64c0..b30a53e 100644 --- a/lib/server/service.js +++ b/lib/server/service.js @@ -130,11 +130,12 @@ class RpcService extends Base { try { result = await method.apply(ctx, args); } catch (err) { + data.isError = true; + data.errorMsg = err.message; // 如果框架自身抛出的异常, error name 可以定义为 SystemError, 方便甄别 // 这个异常直接放到协议头上 if (err.name === 'SystemError') { res.meta.resultCode = err.resultCode || '02'; - data.errorMsg = err.message; } else { // 业务异常 res.meta.resultCode = err.resultCode || '01'; diff --git a/test/client/consumer.test.js b/test/client/consumer.test.js index 64ad8d1..cc8ff1b 100644 --- a/test/client/consumer.test.js +++ b/test/client/consumer.test.js @@ -389,7 +389,7 @@ describe('test/client/consumer.test.js', () => { assert(!req.meta.crcEnable); assert(req.meta.timeout === 3000); assert(req.meta.reqSize === 271); - assert(req.meta.resSize === 113); + assert(req.meta.resSize === 112); assert(req.meta.rt >= 0); assert(res && res.error && res.error.message.includes('mock error')); diff --git a/test/fixtures/proto/helloworld.proto b/test/fixtures/proto/helloworld.proto index be878ce..795ae2c 100644 --- a/test/fixtures/proto/helloworld.proto +++ b/test/fixtures/proto/helloworld.proto @@ -25,6 +25,7 @@ package helloworld; service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} + rpc SayHi (HiRequest) returns (HiReply) {} } // The request message containing the user's name. @@ -36,3 +37,11 @@ message HelloRequest { message HelloReply { string message = 1; } + +message HiRequest { + string name = 1; +} + +message HiReply { + string message = 1; +} diff --git a/test/grpc/index.test.js b/test/grpc/index.test.js index 656a0c5..f64347a 100644 --- a/test/grpc/index.test.js +++ b/test/grpc/index.test.js @@ -21,16 +21,28 @@ describe('test/grpc/index.test.js', () => { logger, port, }); - server.addService({ - interfaceName: 'helloworld.Greeter', - }, { - async SayHello(req) { - await sleep(200); - return { - message: `hello ${req.name}`, - }; + server.addService( + { + interfaceName: 'helloworld.Greeter', }, - }); + { + async SayHello(req) { + await sleep(200); + return { + message: `hello ${req.name}`, + }; + }, + async SayHi(req) { + await sleep(100); + if (req.name === 'throw') { + throw new Error('test error message'); + } + return { + message: `hi ${req.name}`, + }; + }, + } + ); await server.start(); client = new GRpcClient({ proto, @@ -85,4 +97,34 @@ describe('test/grpc/index.test.js', () => { .invoke('SayHello', { name: 'world' }) .expect({ message: 'hello world' }); }); + + it('should invoke gRPC multi times works fine', async function() { + const helloResult = await request(server) + .service('helloworld.Greeter') + .invoke('SayHello', { name: 'world' }); + helloResult.consumer.close(); + assert.deepEqual(helloResult.data, { message: 'hello world' }); + const hiResult = await request(server) + .service('helloworld.Greeter') + .invoke('SayHi', { name: 'world' }); + hiResult.consumer.close(); + assert.deepEqual(hiResult.data, { message: 'hi world' }); + }); + + it('should get error response when service throw exception', async function() { + const consumer = client.createConsumer({ + interfaceName: 'helloworld.Greeter', + serverHost: 'http://localhost:' + port, + }); + await consumer.ready(); + try { + await consumer.invoke('SayHi', [{ name: 'throw' }], {}); + assert(false); + } catch (e) { + assert(e.code === 2); + assert(e.message === 'test error message'); + } finally { + consumer.close(); + } + }); }); diff --git a/test/test/index.test.js b/test/test/index.test.js index 53f3c2a..0a42629 100644 --- a/test/test/index.test.js +++ b/test/test/index.test.js @@ -413,7 +413,7 @@ describe('test/test/index.test.js', () => { assert(false, 'should no run here'); }) .catch(err => { - assert(err.message === 'Error: mock error'); + assert(err.message.includes('Error: mock error')); }); }); });