@@ -2,6 +2,7 @@ import { ClientDuplexStream } from '@grpc/grpc-js';
22import { TextDecoder , TextEncoder } from 'util' ;
33import { injectable , inject , named } from 'inversify' ;
44import { Struct } from 'google-protobuf/google/protobuf/struct_pb' ;
5+ import { Emitter } from '@theia/core/lib/common/event' ;
56import { ILogger } from '@theia/core/lib/common/logger' ;
67import { MonitorService , MonitorServiceClient , MonitorConfig , MonitorError , Status } from '../../common/protocol/monitor-service' ;
78import { StreamingOpenReq , StreamingOpenResp , MonitorConfig as GrpcMonitorConfig } from '../cli-protocol/monitor/monitor_pb' ;
@@ -46,6 +47,8 @@ export class MonitorServiceImpl implements MonitorService {
4647
4748 protected client ?: MonitorServiceClient ;
4849 protected connection ?: { duplex : ClientDuplexStream < StreamingOpenReq , StreamingOpenResp > , config : MonitorConfig } ;
50+ protected messages : string [ ] = [ ] ;
51+ protected onMessageDidReadEmitter = new Emitter < void > ( ) ;
4952
5053 setClient ( client : MonitorServiceClient | undefined ) : void {
5154 this . client = client ;
@@ -86,11 +89,10 @@ export class MonitorServiceImpl implements MonitorService {
8689 } ) . bind ( this ) ) ;
8790
8891 duplex . on ( 'data' , ( ( resp : StreamingOpenResp ) => {
89- if ( this . client ) {
90- const raw = resp . getData ( ) ;
91- const data = typeof raw === 'string' ? raw : new TextDecoder ( 'utf8' ) . decode ( raw ) ;
92- this . client . notifyRead ( { data } ) ;
93- }
92+ const raw = resp . getData ( ) ;
93+ const message = typeof raw === 'string' ? raw : new TextDecoder ( 'utf8' ) . decode ( raw ) ;
94+ this . messages . push ( message ) ;
95+ this . onMessageDidReadEmitter . fire ( ) ;
9496 } ) . bind ( this ) ) ;
9597
9698 const { type, port } = config ;
@@ -116,27 +118,31 @@ export class MonitorServiceImpl implements MonitorService {
116118 }
117119
118120 async disconnect ( reason ?: MonitorError ) : Promise < Status > {
119- if ( ! this . connection && reason && reason . code === MonitorError . ErrorCodes . CLIENT_CANCEL ) {
121+ try {
122+ if ( ! this . connection && reason && reason . code === MonitorError . ErrorCodes . CLIENT_CANCEL ) {
123+ return Status . OK ;
124+ }
125+ this . logger . info ( `>>> Disposing monitor connection...` ) ;
126+ if ( ! this . connection ) {
127+ this . logger . warn ( `<<< Not connected. Nothing to dispose.` ) ;
128+ return Status . NOT_CONNECTED ;
129+ }
130+ const { duplex, config } = this . connection ;
131+ duplex . cancel ( ) ;
132+ this . logger . info ( `<<< Disposed monitor connection for ${ Board . toString ( config . board , { useFqbn : false } ) } on port ${ Port . toString ( config . port ) } .` ) ;
133+ this . connection = undefined ;
120134 return Status . OK ;
135+ } finally {
136+ this . messages . length = 0 ;
121137 }
122- this . logger . info ( `>>> Disposing monitor connection...` ) ;
123- if ( ! this . connection ) {
124- this . logger . warn ( `<<< Not connected. Nothing to dispose.` ) ;
125- return Status . NOT_CONNECTED ;
126- }
127- const { duplex, config } = this . connection ;
128- duplex . cancel ( ) ;
129- this . logger . info ( `<<< Disposed monitor connection for ${ Board . toString ( config . board , { useFqbn : false } ) } on port ${ Port . toString ( config . port ) } .` ) ;
130- this . connection = undefined ;
131- return Status . OK ;
132138 }
133139
134- async send ( data : string ) : Promise < Status > {
140+ async send ( message : string ) : Promise < Status > {
135141 if ( ! this . connection ) {
136142 return Status . NOT_CONNECTED ;
137143 }
138144 const req = new StreamingOpenReq ( ) ;
139- req . setData ( new TextEncoder ( ) . encode ( data ) ) ;
145+ req . setData ( new TextEncoder ( ) . encode ( message ) ) ;
140146 return new Promise < Status > ( resolve => {
141147 if ( this . connection ) {
142148 this . connection . duplex . write ( req , ( ) => {
@@ -148,6 +154,19 @@ export class MonitorServiceImpl implements MonitorService {
148154 } ) ;
149155 }
150156
157+ async request ( ) : Promise < { message : string } > {
158+ const message = this . messages . shift ( ) ;
159+ if ( message ) {
160+ return { message } ;
161+ }
162+ return new Promise < { message : string } > ( resolve => {
163+ const toDispose = this . onMessageDidReadEmitter . event ( ( ) => {
164+ toDispose . dispose ( ) ;
165+ resolve ( this . request ( ) ) ;
166+ } ) ;
167+ } ) ;
168+ }
169+
151170 protected mapType ( type ?: MonitorConfig . ConnectionType ) : GrpcMonitorConfig . TargetType {
152171 switch ( type ) {
153172 case MonitorConfig . ConnectionType . SERIAL : return GrpcMonitorConfig . TargetType . SERIAL ;
0 commit comments