fix: pause request stream on backpressure#936
Conversation
The request stream should be paused if the downstream is indicating that it cannot handle any more data at the moment. The request stream should be resumed once the downstream does accept data again. This reduces memory consumption and potentially out of memory errors when a result stream is piped into a slow writer. Fixes googleapis#934
|
This change looks good to me @olavloite. Thanks for working on this. I can't say I have the expertise to review it properly. @bcoe could I kindly ask you to take a look to see if this is reasonable? |
|
I'm also very confused by these test failures that are happening on all PRs. I can't reproduce it locally. |
|
I've been doing some additional testing with and without this fix using the function below. It does a select from a Spanner database, transforms the rows into JSON strings and writes that to a file. There is also a custom transformer in the pipeline that artificially slows the write progress every 50 rows to simulate a slow flush. Running this script with/without the change in this PR on mid-sized (87MB) and a huge (2GB) result set gives the following results for maximum memory usage: Normal result set (87MB)
Huge result set (2GB)
The exact effect of this fix will depend a lot on multiple factors:
Larger values for any of the above will result in higher memory usage in all cases, and without this fix it can cause the entire result set to be loaded into memory. Without slow flushIt should also be noted that streaming result sets will in most cases be extremely efficient. Running the same test without the simulated slow flushes show the same results both with and without this fix. It also shows the effect of how Spanner chunks the result set. The memory consumption when streaming the huge result set is a lot lower than when streaming the normal result set (test results are equal with/without this fix):
Test scriptasync function queryWithMemUsage(instanceId, databaseId, projectId) {
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');
// eslint-disable-next-line node/no-unpublished-require
// const {Spanner} = require('../build/src');
const fs = require('fs');
const stream = require('stream');
const util = require('util');
// eslint-disable-next-line node/no-unsupported-features/node-builtins
const pipeline = util.promisify(stream.pipeline);
// Creates a client
const spanner = new Spanner({
projectId: projectId,
});
// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const query = {
sql: `SELECT *
FROM TableWithAllColumnTypes
ORDER BY ColInt64`,
};
let count = 0;
let maxMemMeasured = 0;
const fileStream = fs.createWriteStream('/home/loite/rs.txt');
const rs = database.runStream(query);
console.time('process result set');
// eslint-disable-next-line node/no-unsupported-features/node-builtins
await pipeline(
rs,
new stream.Transform({
objectMode: true,
highWaterMark: 100,
transform(chunk, encoding, callback) {
count++;
if (count % 100 === 0) {
console.log(`Processed ${count} rows so far`);
global.gc();
const used = process.memoryUsage().heapUsed / 1024 / 1024;
const memUsed = Math.round(used * 100) / 100;
console.log(`Current mem usage: ${memUsed} MB`);
maxMemMeasured = Math.max(maxMemMeasured, memUsed);
}
this.push(`${JSON.stringify(chunk.toJSON({wrapNumbers: true}))}\n`);
callback();
},
}),
// Create an artificially slow transformer to simulate network latency.
new stream.Transform({
highWaterMark: 100,
transform(chunk, encoding, callback) {
// Simulate a slow flush every 50 records.
if (count % 50 === 0) {
setTimeout(() => {
this.push(chunk, encoding);
callback();
}, Math.random() * 200 + 100);
} else {
this.push(chunk, encoding);
callback();
}
},
}),
fileStream
);
console.timeEnd('process result set');
console.log(`Max memory used: ${maxMemMeasured} MB`);
console.log('Finished writing file');
await database.close();
} |
|
@olavloite this is pretty great. We've been carrying around the subtle issue where we have a single data event that needs to be split apart and forwarded to the next stream as multiple data events. That's the core issue we're attacking here, right? One thing I noticed-- it looks like the transform stream would indefinitely try if the consumer never becomes ready. We probably want a cap on the maximum amount of attempts to avoid that. I have a module "split-array-stream" which currently has the same problem. We use split-array-stream throughout various libraries, maybe even this one, although it looks like not from this file. I haven't merged this change yet, but would this class either be plug-and-playable here, or be useful in some way to incorporate in PartialResultStream? The description of this PR shows how it can be used: stephenplusplus/split-array-stream#4 Note that it's not merged and released, as it has not yet had a formal review. However, at the time, I had put it through similar tests as you did for this change. Since it seems like we're attacking the same issue, I thought it could be worth checking out. Let me know what you think! |
|
@stephenplusplus
Correct. Cloud Spanner returns a stream of
Good point. I'll add an escape for that possibility. Regarding It does however seem that it's solving much of the same problem that we are having here, but there's one thing that might be different (or that I'm missing): During my testing of the Spanner client library without this PR I was able to make it go out of memory. That seems to have been caused by request stream that kept pushing |
PartialResultSetStream should stop retrying to push data into the stream after a configurable number of retries have failed.
Codecov Report
@@ Coverage Diff @@
## master #936 +/- ##
=======================================
Coverage 98.26% 98.26%
=======================================
Files 21 21
Lines 20356 20423 +67
Branches 1084 1096 +12
=======================================
+ Hits 20002 20069 +67
Misses 351 351
Partials 3 3
Continue to review full report at Codecov.
|
|
@stephenplusplus Would you mind taking a second look at this? |
|
@olavloite looks good to me. Just a thought that I know would be inconvenient to implement, and potentially not worth it because of that-- after this change, the stream philosophy of "each stream doesn't concern itself with another stream" is broken when we pass Possibly a way around this would be to have the PRS emit events to indicate it needs a break, and concerned streams could react as a result. Something like: values.forEach(value => {
res = this._addValue(value) && res;
if (!res && !this._requestStream.isPaused()) {
- this._requestStream.pause();
+ this.emit('paused');
}
});requestsStream
.pipe(batchAndSplitOnTokenStream)
// If we get this error, the checkpoint stream has flushed any rows
// it had queued. We can now destroy the user's stream, as our retry
// attempts are over.
.on('error', (err: Error) => userStream.destroy(err))
.on('checkpoint', (row: google.spanner.v1.PartialResultSet) => {
lastResumeToken = row.resumeToken;
})
.pipe(userStream)
+ .on('paused', () => {
+ requestsStream.pause();
+ })In fact, writing that made me realize that should work by default, shouldn't it? The only reason we don't get built-in backpressure is because we do the one-to-many split of data events. But now that we push each data event singularly, you should be able to pause the PRS stream itself and have the streams before it react properly automatically. |
I like the idea of this. It does make it more idiomatic while the end result is the same. We do need two events, though: |
|
Thanks for doing that! |
The request stream should be paused if the downstream is indicating that it cannot handle any more data at the moment. The request stream should be resumed once the downstream does accept data again. This reduces memory consumption and potentially out of memory errors when
a result stream is piped into a slow writer.
Fixes #934