Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion example/grpc_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ async function test() {
serverHost: 'http://localhost:12200',
});
await consumer.ready();
const r = await consumer.invoke('SayHello', [{ name: 'peter' }]);
const r = await Promise.all([
consumer.invoke('SayHello', [{ name: 'peter' }]),
consumer.invoke('SayHi', [{ name: 'tony' }]),
]);
console.log(r);
}

Expand Down
6 changes: 6 additions & 0 deletions example/grpc_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ server.addService({
message: `hello ${req.name} from sofa-rpc-node`,
};
},
async SayHi(req) {
console.log(req);
return {
message: `hi ${req.name} from sofa-rpc-node`,
};
},
});

server.start();
1 change: 1 addition & 0 deletions example/proto/helloworld.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package helloworld;
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHi (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
Expand Down
1 change: 1 addition & 0 deletions lib/client/connection/grpc/call_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class Http2CallStream extends Duplex {
}
this._endCall(err);
this.destroy();
this.emit('close');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个应该不用主动触发吧? destroy 以后会自动触发 close

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的 我的 node 是 v8.x ,在这个版本里 destroy 后收不到 close 事件,所以我手动 emit 了一个

Copy link
Contributor Author

@smile21 smile21 Mar 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可能和这个 pr 有关 nodejs/node#23654 v8.x 目前还是有这个问题

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加一个注释?

}

_attachHttp2Stream(stream) {
Expand Down
59 changes: 43 additions & 16 deletions lib/server/grpc/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,37 @@ class GRpcResponse {

async send(res) {
const methodInfo = this.methodInfo;
if (methodInfo.responseType) {
let http2Header = {
'content-type': 'application/grpc',
'grpc-accept-encoding': 'identity',
'accept-encoding': 'identity',
};
let grpcMeta = {
'grpc-status': 0,
'grpc-message': 'OK',
};
let data = '';
if (res.isError) {
this.meta.rt = Date.now() - this.meta.start;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rt 可以统一设置

this.meta.resultCode = '02';
http2Header = {
...http2Header,
':status': 500, // make a unknown http2 status
};
grpcMeta = {
...grpcMeta,
'grpc-status': 2, // UNKNOWN
'grpc-message': res.errorMsg,
};
data = '';
} else if (methodInfo.responseType) {
const responseEncodeStart = Date.now();
const responseType = methodInfo.resolvedResponseType;
const buf = responseType.encode(responseType.fromObject(res.appResponse)).finish();
const buf = responseType
.encode(responseType.fromObject(res.appResponse))
.finish();
const resSize = buf.length;
io.reset();
io.put(0);
io.putInt(resSize);
io.put(buf);
Expand All @@ -42,21 +68,22 @@ class GRpcResponse {
this.meta.data = buf;
this.meta.rt = Date.now() - this.meta.start;
this.meta.resSize = resSize + 5;

this.stream.respond({
':status': 200,
'content-type': 'application/grpc',
'grpc-accept-encoding': 'identity',
'accept-encoding': 'identity',
}, { waitForTrailers: true });
this.stream.on('wantTrailers', () => {
this.stream.sendTrailers({
'grpc-status': 0,
'grpc-message': 'OK',
});
});
this.stream.end(io.array());
http2Header = { ...http2Header, ':status': 200 };
grpcMeta = {
...grpcMeta,
'grpc-status': 0,
'grpc-message': 'OK',
};
data = io.array();
}
if (!this.stream || this.stream.destroyed) {
return;
}
this.stream.respond(http2Header, { waitForTrailers: true });
this.stream.on('wantTrailers', () => {
this.stream.sendTrailers(grpcMeta);
});
this.stream.end(data);
}
}

Expand Down
18 changes: 10 additions & 8 deletions lib/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,17 @@ class RpcServer extends Base {
this.emit('request', { req, ctx });
try {
if (!service) {
res.meta.resultCode = '02';
await res.send({
isError: true,
errorMsg: 'not found service: ' + id,
appResponse: null,
});
} else {
await service.invoke(ctx, req, res);
throw new Error('not found service: ' + id);
}
await service.invoke(ctx, req, res);
} catch (e) {
this.emit('error', e);
res.meta.resultCode = '02';
await res.send({
isError: true,
errorMsg: e.message,
appResponse: e,
});
} finally {
this.emit('response', { ctx, req, res });
}
Expand Down
3 changes: 2 additions & 1 deletion lib/server/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,12 @@ class RpcService extends Base {
try {
result = await method.apply(ctx, args);
} catch (err) {
data.isError = true;
data.errorMsg = err.message;
// 如果框架自身抛出的异常, error name 可以定义为 SystemError, 方便甄别
// 这个异常直接放到协议头上
if (err.name === 'SystemError') {
res.meta.resultCode = err.resultCode || '02';
data.errorMsg = err.message;
} else {
// 业务异常
res.meta.resultCode = err.resultCode || '01';
Expand Down
2 changes: 1 addition & 1 deletion test/client/consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ describe('test/client/consumer.test.js', () => {
assert(!req.meta.crcEnable);
assert(req.meta.timeout === 3000);
assert(req.meta.reqSize === 271);
assert(req.meta.resSize === 113);
assert(req.meta.resSize === 112);
assert(req.meta.rt >= 0);

assert(res && res.error && res.error.message.includes('mock error'));
Expand Down
9 changes: 9 additions & 0 deletions test/fixtures/proto/helloworld.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package helloworld;
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHi (HiRequest) returns (HiReply) {}
}

// The request message containing the user's name.
Expand All @@ -36,3 +37,11 @@ message HelloRequest {
message HelloReply {
string message = 1;
}

message HiRequest {
string name = 1;
}

message HiReply {
string message = 1;
}
60 changes: 51 additions & 9 deletions test/grpc/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,28 @@ describe('test/grpc/index.test.js', () => {
logger,
port,
});
server.addService({
interfaceName: 'helloworld.Greeter',
}, {
async SayHello(req) {
await sleep(200);
return {
message: `hello ${req.name}`,
};
server.addService(
{
interfaceName: 'helloworld.Greeter',
},
});
{
async SayHello(req) {
await sleep(200);
return {
message: `hello ${req.name}`,
};
},
async SayHi(req) {
await sleep(100);
if (req.name === 'throw') {
throw new Error('test error message');
}
return {
message: `hi ${req.name}`,
};
},
}
);
await server.start();
client = new GRpcClient({
proto,
Expand Down Expand Up @@ -85,4 +97,34 @@ describe('test/grpc/index.test.js', () => {
.invoke('SayHello', { name: 'world' })
.expect({ message: 'hello world' });
});

it('should invoke gRPC multi times works fine', async function() {
const helloResult = await request(server)
.service('helloworld.Greeter')
.invoke('SayHello', { name: 'world' });
helloResult.consumer.close();
assert.deepEqual(helloResult.data, { message: 'hello world' });
const hiResult = await request(server)
.service('helloworld.Greeter')
.invoke('SayHi', { name: 'world' });
hiResult.consumer.close();
assert.deepEqual(hiResult.data, { message: 'hi world' });
});

it('should get error response when service throw exception', async function() {
const consumer = client.createConsumer({
interfaceName: 'helloworld.Greeter',
serverHost: 'http://localhost:' + port,
});
await consumer.ready();
try {
await consumer.invoke('SayHi', [{ name: 'throw' }], {});
assert(false);
} catch (e) {
assert(e.code === 2);
assert(e.message === 'test error message');
} finally {
consumer.close();
}
});
});
2 changes: 1 addition & 1 deletion test/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ describe('test/test/index.test.js', () => {
assert(false, 'should no run here');
})
.catch(err => {
assert(err.message === 'Error: mock error');
assert(err.message.includes('Error: mock error'));
});
});
});
Expand Down