Skip to content

Commit f84b416

Browse files
committed
net: allow reading data into a static buffer
1 parent ccf37b3 commit f84b416

File tree

14 files changed

+225
-66
lines changed

14 files changed

+225
-66
lines changed

benchmark/net/net-s2c.js

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,48 +5,70 @@ const common = require('../common.js');
55
const PORT = common.PORT;
66

77
const bench = common.createBenchmark(main, {
8-
len: [64, 102400, 1024 * 1024 * 16],
8+
sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024],
99
type: ['utf', 'asc', 'buf'],
10+
recvbuflen: [0, 64 * 1024, 1024 * 1024],
1011
dur: [5]
1112
});
1213

1314
var chunk;
1415
var encoding;
16+
var recvbuf;
17+
var received = 0;
18+
19+
function main({ dur, sendchunklen, type, recvbuflen }) {
20+
if (isFinite(recvbuflen) && recvbuflen > 0)
21+
recvbuf = Buffer.alloc(recvbuflen);
1522

16-
function main({ dur, len, type }) {
1723
switch (type) {
1824
case 'buf':
19-
chunk = Buffer.alloc(len, 'x');
25+
chunk = Buffer.alloc(sendchunklen, 'x');
2026
break;
2127
case 'utf':
2228
encoding = 'utf8';
23-
chunk = 'ü'.repeat(len / 2);
29+
chunk = 'ü'.repeat(sendchunklen / 2);
2430
break;
2531
case 'asc':
2632
encoding = 'ascii';
27-
chunk = 'x'.repeat(len);
33+
chunk = 'x'.repeat(sendchunklen);
2834
break;
2935
default:
3036
throw new Error(`invalid type: ${type}`);
3137
}
3238

3339
const reader = new Reader();
34-
const writer = new Writer();
40+
var writer;
41+
var socketOpts;
42+
if (recvbuf === undefined) {
43+
writer = new Writer();
44+
socketOpts = { port: PORT };
45+
} else {
46+
socketOpts = {
47+
port: PORT,
48+
onread: {
49+
buffer: recvbuf,
50+
callback: function(nread, buf) {
51+
received += nread;
52+
}
53+
}
54+
};
55+
}
3556

3657
// the actual benchmark.
3758
const server = net.createServer(function(socket) {
3859
reader.pipe(socket);
3960
});
4061

4162
server.listen(PORT, function() {
42-
const socket = net.connect(PORT);
63+
const socket = net.connect(socketOpts);
4364
socket.on('connect', function() {
4465
bench.start();
4566

46-
socket.pipe(writer);
67+
if (recvbuf === undefined)
68+
socket.pipe(writer);
4769

4870
setTimeout(function() {
49-
const bytes = writer.received;
71+
const bytes = received;
5072
const gbits = (bytes * 8) / (1024 * 1024 * 1024);
5173
bench.end(gbits);
5274
process.exit(0);
@@ -58,12 +80,11 @@ function main({ dur, len, type }) {
5880
const net = require('net');
5981

6082
function Writer() {
61-
this.received = 0;
6283
this.writable = true;
6384
}
6485

6586
Writer.prototype.write = function(chunk, encoding, cb) {
66-
this.received += chunk.length;
87+
received += chunk.length;
6788

6889
if (typeof encoding === 'function')
6990
encoding();

lib/internal/stream_base_commons.js

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const { owner_symbol } = require('internal/async_hooks').symbols;
1717
const kMaybeDestroy = Symbol('kMaybeDestroy');
1818
const kUpdateTimer = Symbol('kUpdateTimer');
1919
const kAfterAsyncWrite = Symbol('kAfterAsyncWrite');
20+
const kBuffer = Symbol('kBuffer');
21+
const kBufferCb = Symbol('kBufferCb');
2022

2123
function handleWriteReq(req, data, encoding) {
2224
const { handle } = req;
@@ -140,9 +142,16 @@ function onStreamRead(arrayBuffer) {
140142
stream[kUpdateTimer]();
141143

142144
if (nread > 0 && !stream.destroyed) {
143-
const offset = streamBaseState[kArrayBufferOffset];
144-
const buf = new FastBuffer(arrayBuffer, offset, nread);
145-
if (!stream.push(buf)) {
145+
var ret;
146+
const userBuf = stream[kBuffer];
147+
if (userBuf) {
148+
ret = (stream[kBufferCb](nread, userBuf) !== false);
149+
} else {
150+
const offset = streamBaseState[kArrayBufferOffset];
151+
const buf = new FastBuffer(arrayBuffer, offset, nread);
152+
ret = stream.push(buf);
153+
}
154+
if (!ret) {
146155
handle.reading = false;
147156
if (!stream.destroyed) {
148157
const err = handle.readStop();
@@ -186,4 +195,6 @@ module.exports = {
186195
kAfterAsyncWrite,
187196
kMaybeDestroy,
188197
kUpdateTimer,
198+
kBuffer,
199+
kBufferCb,
189200
};

lib/net.js

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ const {
6363
writeGeneric,
6464
onStreamRead,
6565
kAfterAsyncWrite,
66-
kUpdateTimer
66+
kUpdateTimer,
67+
kBuffer,
68+
kBufferCb
6769
} = require('internal/stream_base_commons');
6870
const {
6971
codes: {
@@ -101,18 +103,20 @@ function getFlags(ipv6Only) {
101103
return ipv6Only === true ? TCPConstants.UV_TCP_IPV6ONLY : 0;
102104
}
103105

104-
function createHandle(fd, is_server) {
106+
function createHandle(fd, is_server, buf) {
105107
validateInt32(fd, 'fd', 0);
106108
const type = TTYWrap.guessHandleType(fd);
107109
if (type === 'PIPE') {
108110
return new Pipe(
109-
is_server ? PipeConstants.SERVER : PipeConstants.SOCKET
111+
is_server ? PipeConstants.SERVER : PipeConstants.SOCKET,
112+
buf
110113
);
111114
}
112115

113116
if (type === 'TCP') {
114117
return new TCP(
115-
is_server ? TCPConstants.SERVER : TCPConstants.SOCKET
118+
is_server ? TCPConstants.SERVER : TCPConstants.SOCKET,
119+
buf
116120
);
117121
}
118122

@@ -241,6 +245,8 @@ function Socket(options) {
241245
this._host = null;
242246
this[kLastWriteQueueSize] = 0;
243247
this[kTimeout] = null;
248+
this[kBuffer] = null;
249+
this[kBufferCb] = null;
244250

245251
if (typeof options === 'number')
246252
options = { fd: options }; // Legacy interface.
@@ -265,40 +271,50 @@ function Socket(options) {
265271
if (options.handle) {
266272
this._handle = options.handle; // private
267273
this[async_id_symbol] = getNewAsyncId(this._handle);
268-
} else if (options.fd !== undefined) {
269-
const { fd } = options;
270-
let err;
271-
272-
// createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
273-
// a valid `PIPE` or `TCP` descriptor
274-
this._handle = createHandle(fd, false);
275-
276-
err = this._handle.open(fd);
274+
} else {
275+
const onread = options.onread;
276+
if (onread !== null && typeof onread === 'object' &&
277+
Buffer.isBuffer(onread.buffer) &&
278+
typeof onread.callback === 'function') {
279+
this[kBuffer] = onread.buffer;
280+
this[kBufferCb] = onread.callback;
281+
}
282+
if (options.fd !== undefined) {
283+
const { fd } = options;
284+
let err;
277285

278-
// While difficult to fabricate, in some architectures
279-
// `open` may return an error code for valid file descriptors
280-
// which cannot be opened. This is difficult to test as most
281-
// un-openable fds will throw on `createHandle`
282-
if (err)
283-
throw errnoException(err, 'open');
286+
// createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
287+
// a valid `PIPE` or `TCP` descriptor
288+
this._handle = createHandle(fd, false);
284289

285-
this[async_id_symbol] = this._handle.getAsyncId();
290+
err = this._handle.open(fd);
286291

287-
if ((fd === 1 || fd === 2) &&
288-
(this._handle instanceof Pipe) &&
289-
process.platform === 'win32') {
290-
// Make stdout and stderr blocking on Windows
291-
err = this._handle.setBlocking(true);
292+
// While difficult to fabricate, in some architectures
293+
// `open` may return an error code for valid file descriptors
294+
// which cannot be opened. This is difficult to test as most
295+
// un-openable fds will throw on `createHandle`
292296
if (err)
293-
throw errnoException(err, 'setBlocking');
294-
295-
this._writev = null;
296-
this._write = makeSyncWrite(fd);
297-
// makeSyncWrite adjusts this value like the original handle would, so
298-
// we need to let it do that by turning it into a writable, own property.
299-
Object.defineProperty(this._handle, 'bytesWritten', {
300-
value: 0, writable: true
301-
});
297+
throw errnoException(err, 'open');
298+
299+
this[async_id_symbol] = this._handle.getAsyncId();
300+
301+
if ((fd === 1 || fd === 2) &&
302+
(this._handle instanceof Pipe) &&
303+
process.platform === 'win32') {
304+
// Make stdout and stderr blocking on Windows
305+
err = this._handle.setBlocking(true);
306+
if (err)
307+
throw errnoException(err, 'setBlocking');
308+
309+
this._writev = null;
310+
this._write = makeSyncWrite(fd);
311+
// makeSyncWrite adjusts this value like the original handle would, so
312+
// we need to let it do that by turning it into a writable, own
313+
// property.
314+
Object.defineProperty(this._handle, 'bytesWritten', {
315+
value: 0, writable: true
316+
});
317+
}
302318
}
303319
}
304320

@@ -888,8 +904,8 @@ Socket.prototype.connect = function(...args) {
888904

889905
if (!this._handle) {
890906
this._handle = pipe ?
891-
new Pipe(PipeConstants.SOCKET) :
892-
new TCP(TCPConstants.SOCKET);
907+
new Pipe(PipeConstants.SOCKET, this[kBuffer]) :
908+
new TCP(TCPConstants.SOCKET, this[kBuffer]);
893909
initSocketHandle(this);
894910
}
895911

src/connection_wrap.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,17 @@ ConnectionWrap<WrapType, UVType>::ConnectionWrap(Environment* env,
2929
reinterpret_cast<uv_stream_t*>(&handle_),
3030
provider) {}
3131

32+
template <typename WrapType, typename UVType>
33+
ConnectionWrap<WrapType, UVType>::ConnectionWrap(Environment* env,
34+
Local<Object> object,
35+
ProviderType provider,
36+
uv_buf_t buf)
37+
: LibuvStreamWrap(env,
38+
object,
39+
reinterpret_cast<uv_stream_t*>(&handle_),
40+
provider,
41+
buf) {}
42+
3243

3344
template <typename WrapType, typename UVType>
3445
void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle,
@@ -116,11 +127,23 @@ template ConnectionWrap<PipeWrap, uv_pipe_t>::ConnectionWrap(
116127
Local<Object> object,
117128
ProviderType provider);
118129

130+
template ConnectionWrap<PipeWrap, uv_pipe_t>::ConnectionWrap(
131+
Environment* env,
132+
Local<Object> object,
133+
ProviderType provider,
134+
uv_buf_t buf);
135+
119136
template ConnectionWrap<TCPWrap, uv_tcp_t>::ConnectionWrap(
120137
Environment* env,
121138
Local<Object> object,
122139
ProviderType provider);
123140

141+
template ConnectionWrap<TCPWrap, uv_tcp_t>::ConnectionWrap(
142+
Environment* env,
143+
Local<Object> object,
144+
ProviderType provider,
145+
uv_buf_t buf);
146+
124147
template void ConnectionWrap<PipeWrap, uv_pipe_t>::OnConnection(
125148
uv_stream_t* handle, int status);
126149

src/connection_wrap.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ class ConnectionWrap : public LibuvStreamWrap {
1919
ConnectionWrap(Environment* env,
2020
v8::Local<v8::Object> object,
2121
ProviderType provider);
22+
ConnectionWrap(Environment* env,
23+
v8::Local<v8::Object> object,
24+
ProviderType provider,
25+
uv_buf_t buf);
2226

2327
UVType handle_;
2428
};

src/pipe_wrap.cc

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,14 @@ void PipeWrap::New(const FunctionCallbackInfo<Value>& args) {
149149
UNREACHABLE();
150150
}
151151

152-
new PipeWrap(env, args.This(), provider, ipc);
152+
if (args.Length() > 1 && Buffer::HasInstance(args[1])) {
153+
uv_buf_t buf;
154+
buf.base = Buffer::Data(args[1]);
155+
buf.len = Buffer::Length(args[1]);
156+
new PipeWrap(env, args.This(), provider, ipc, buf);
157+
} else {
158+
new PipeWrap(env, args.This(), provider, ipc);
159+
}
153160
}
154161

155162

@@ -163,6 +170,17 @@ PipeWrap::PipeWrap(Environment* env,
163170
// Suggestion: uv_pipe_init() returns void.
164171
}
165172

173+
PipeWrap::PipeWrap(Environment* env,
174+
Local<Object> object,
175+
ProviderType provider,
176+
bool ipc,
177+
uv_buf_t buf)
178+
: ConnectionWrap(env, object, provider, buf) {
179+
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
180+
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
181+
// Suggestion: uv_pipe_init() returns void.
182+
}
183+
166184

167185
void PipeWrap::Bind(const FunctionCallbackInfo<Value>& args) {
168186
PipeWrap* wrap;

src/pipe_wrap.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ class PipeWrap : public ConnectionWrap<PipeWrap, uv_pipe_t> {
5555
v8::Local<v8::Object> object,
5656
ProviderType provider,
5757
bool ipc);
58+
PipeWrap(Environment* env,
59+
v8::Local<v8::Object> object,
60+
ProviderType provider,
61+
bool ipc,
62+
uv_buf_t buf);
5863

5964
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
6065
static void Bind(const v8::FunctionCallbackInfo<v8::Value>& args);

src/stream_base-inl.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,25 @@ inline void StreamResource::EmitWantsWrite(size_t suggested_size) {
150150
}
151151

152152
inline StreamBase::StreamBase(Environment* env) : env_(env) {
153+
buf_.base = nullptr;
154+
buf_.len = 0;
155+
PushStreamListener(&default_listener_);
156+
}
157+
158+
inline StreamBase::StreamBase(Environment* env, uv_buf_t buf)
159+
: env_(env),
160+
buf_(buf) {
153161
PushStreamListener(&default_listener_);
154162
}
155163

156164
inline Environment* StreamBase::stream_env() const {
157165
return env_;
158166
}
159167

168+
inline uv_buf_t StreamBase::stream_buf() const {
169+
return buf_;
170+
}
171+
160172
inline int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
161173
Environment* env = stream_env();
162174

0 commit comments

Comments
 (0)