@@ -19,6 +19,7 @@ import (
1919 "errors"
2020 "fmt"
2121 "io"
22+ "sync/atomic"
2223
2324 "github.com/arduino/arduino-cli/arduino/monitors"
2425 rpc "github.com/arduino/arduino-cli/rpc/monitor"
@@ -79,6 +80,17 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
7980 streamClosed := make (chan error )
8081 targetClosed := make (chan error )
8182
83+ // set rate limiting window
84+ bufferSize := int (config .GetRecvRateLimitBuffer ())
85+ rateLimitEnabled := (bufferSize > 0 )
86+ if ! rateLimitEnabled {
87+ bufferSize = 1024
88+ }
89+ buffer := make ([]byte , bufferSize )
90+ bufferUsed := 0
91+
92+ var writeSlots int32
93+
8294 // now we can read the other messages and re-route to the monitor...
8395 go func () {
8496 for {
@@ -95,6 +107,11 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
95107 break
96108 }
97109
110+ if rateLimitEnabled {
111+ // Increase rate limiter write slots
112+ atomic .AddInt32 (& writeSlots , msg .GetRecvAcknowledge ())
113+ }
114+
98115 if _ , err := mon .Write (msg .GetData ()); err != nil {
99116 // error writing to target
100117 targetClosed <- err
@@ -105,27 +122,59 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
105122
106123 // ...and read from the monitor and forward to the output stream
107124 go func () {
108- buf := make ([]byte , 8 )
125+ dropBuffer := make ([]byte , 10240 )
126+ dropped := 0
109127 for {
110- n , err := mon .Read (buf )
111- if err != nil {
112- // error reading from target
113- targetClosed <- err
114- break
115- }
116-
117- if n == 0 {
118- // target was closed
119- targetClosed <- nil
120- break
128+ if bufferUsed < bufferSize {
129+ if n , err := mon .Read (buffer [bufferUsed :]); err != nil {
130+ // error reading from target
131+ targetClosed <- err
132+ break
133+ } else if n == 0 {
134+ // target was closed
135+ targetClosed <- nil
136+ break
137+ } else {
138+ bufferUsed += n
139+ }
140+ } else {
141+ // FIXME: a very rare condition but still...
142+ // we may be waiting here while, in the meantime, a transmit slot is
143+ // freed: in this case the (filled) buffer will stay in the server
144+ // until the following Read exits (-> the next char arrives from the
145+ // monitor).
146+
147+ if n , err := mon .Read (dropBuffer ); err != nil {
148+ // error reading from target
149+ targetClosed <- err
150+ break
151+ } else if n == 0 {
152+ // target was closed
153+ targetClosed <- nil
154+ break
155+ } else {
156+ dropped += n
157+ }
121158 }
122159
123- if err = stream .Send (& rpc.StreamingOpenResp {
124- Data : buf [:n ],
125- }); err != nil {
126- // error sending to stream
127- streamClosed <- err
128- break
160+ slots := atomic .LoadInt32 (& writeSlots )
161+ if ! rateLimitEnabled || slots > 0 {
162+ if err = stream .Send (& rpc.StreamingOpenResp {
163+ Data : buffer [:bufferUsed ],
164+ Dropped : int32 (dropped ),
165+ }); err != nil {
166+ // error sending to stream
167+ streamClosed <- err
168+ break
169+ }
170+ bufferUsed = 0
171+ dropped = 0
172+
173+ // Rate limit, filling all the available window
174+ if rateLimitEnabled {
175+ slots = atomic .AddInt32 (& writeSlots , - 1 )
176+ //fmt.Println("FREE SLOTS:", slots)
177+ }
129178 }
130179 }
131180 }()
0 commit comments