From 6ab0178846fd3a8afc9e3563363da5a08b7a6d7e Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Mon, 15 Apr 2024 13:54:58 +0300 Subject: [PATCH 1/2] [PECO-1541] Optimize UC Volume ingestion Signed-off-by: Levko Kravets --- lib/DBSQLSession.ts | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index 9863bc0c..dc0ec09a 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -1,5 +1,6 @@ import * as fs from 'fs'; import * as path from 'path'; +import { pipeline } from 'stream'; import { stringify, NIL, parse } from 'uuid'; import fetch, { HeadersInit } from 'node-fetch'; import { @@ -271,8 +272,23 @@ export default class DBSQLSession implements IDBSQLSession { if (!response.ok) { throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); } - const buffer = await response.arrayBuffer(); - fs.writeFileSync(localFile, Buffer.from(buffer)); + + return new Promise((resolve, reject) => { + try { + const fileStream = fs.createWriteStream(localFile); + // `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly + // Also, we use callback-style `pipeline` because Promise-style one is not available in Node 14 + pipeline(response.body, fileStream, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + } catch (error) { + reject(error); + } + }); } private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise { @@ -301,8 +317,19 @@ export default class DBSQLSession implements IDBSQLSession { const connectionProvider = await this.context.getConnectionProvider(); const agent = await connectionProvider.getAgent(); - const data = fs.readFileSync(localFile); - const response = await fetch(presignedUrl, { method: 'PUT', headers, agent, body: data }); + const fileStream = fs.createReadStream(localFile); + const fileInfo = fs.statSync(localFile, { bigint: true }); + + const response = await fetch(presignedUrl, { + method: 'PUT', + headers: { + ...headers, + // This header is required by server + 'Content-Length': fileInfo.size.toString(), + }, + agent, + body: fileStream, + }); if (!response.ok) { throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); } From 44c5d626f28662b7a0eac5cf075be0bcbc798351 Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Tue, 16 Apr 2024 07:27:08 +0300 Subject: [PATCH 2/2] Refine the code Signed-off-by: Levko Kravets --- lib/DBSQLSession.ts | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index dc0ec09a..a9bbbb2e 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -1,6 +1,7 @@ import * as fs from 'fs'; import * as path from 'path'; -import { pipeline } from 'stream'; +import stream from 'node:stream'; +import util from 'node:util'; import { stringify, NIL, parse } from 'uuid'; import fetch, { HeadersInit } from 'node-fetch'; import { @@ -37,6 +38,9 @@ import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter'; import ParameterError from './errors/ParameterError'; import IClientContext, { ClientConfig } from './contracts/IClientContext'; +// Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14 +const pipeline = util.promisify(stream.pipeline); + const defaultMaxRows = 100000; interface OperationResponseShape { @@ -273,22 +277,9 @@ export default class DBSQLSession implements IDBSQLSession { throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); } - return new Promise((resolve, reject) => { - try { - const fileStream = fs.createWriteStream(localFile); - // `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly - // Also, we use callback-style `pipeline` because Promise-style one is not available in Node 14 - pipeline(response.body, fileStream, (error) => { - if (error) { - reject(error); - } else { - resolve(); - } - }); - } catch (error) { - reject(error); - } - }); + const fileStream = fs.createWriteStream(localFile); + // `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly + return pipeline(response.body, fileStream); } private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise {