Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import * as fs from 'fs';
import * as path from 'path';
import stream from 'node:stream';
import util from 'node:util';
import { stringify, NIL, parse } from 'uuid';
import fetch, { HeadersInit } from 'node-fetch';
import {
Expand Down Expand Up @@ -36,6 +38,9 @@ import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter';
import ParameterError from './errors/ParameterError';
import IClientContext, { ClientConfig } from './contracts/IClientContext';

// Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14
const pipeline = util.promisify(stream.pipeline);

const defaultMaxRows = 100000;

interface OperationResponseShape {
Expand Down Expand Up @@ -271,8 +276,10 @@ export default class DBSQLSession implements IDBSQLSession {
if (!response.ok) {
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
}
const buffer = await response.arrayBuffer();
fs.writeFileSync(localFile, Buffer.from(buffer));

const fileStream = fs.createWriteStream(localFile);
// `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly
return pipeline(response.body, fileStream);
}

private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise<void> {
Expand Down Expand Up @@ -301,8 +308,19 @@ export default class DBSQLSession implements IDBSQLSession {
const connectionProvider = await this.context.getConnectionProvider();
const agent = await connectionProvider.getAgent();

const data = fs.readFileSync(localFile);
const response = await fetch(presignedUrl, { method: 'PUT', headers, agent, body: data });
const fileStream = fs.createReadStream(localFile);
const fileInfo = fs.statSync(localFile, { bigint: true });

const response = await fetch(presignedUrl, {
method: 'PUT',
headers: {
...headers,
// This header is required by server
'Content-Length': fileInfo.size.toString(),
},
agent,
body: fileStream,
});
if (!response.ok) {
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
}
Expand Down