From 70cb12163dd9e29f628dd891e727258884a3e4f5 Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Tue, 23 Apr 2024 12:43:01 +0300 Subject: [PATCH 1/5] Get rid of redundant `lib/hive/Types` Signed-off-by: Levko Kravets --- lib/DBSQLClient.ts | 2 +- lib/DBSQLSession.ts | 2 +- lib/contracts/IDBSQLSession.ts | 2 +- lib/dto/InfoValue.ts | 2 +- lib/hive/Types/index.ts | 39 --------------------------------- lib/result/JsonResultHandler.ts | 21 ++---------------- lib/result/RowSetProvider.ts | 35 +++++++++-------------------- lib/result/utils.ts | 18 ++++++++++++++- 8 files changed, 33 insertions(+), 88 deletions(-) delete mode 100644 lib/hive/Types/index.ts diff --git a/lib/DBSQLClient.ts b/lib/DBSQLClient.ts index 57d7225d..4b4ecd68 100644 --- a/lib/DBSQLClient.ts +++ b/lib/DBSQLClient.ts @@ -1,4 +1,5 @@ import thrift from 'thrift'; +import Int64 from 'node-int64'; import { EventEmitter } from 'events'; import TCLIService from '../thrift/TCLIService'; @@ -7,7 +8,6 @@ import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } fr import IDriver from './contracts/IDriver'; import IClientContext, { ClientConfig } from './contracts/IClientContext'; import HiveDriver from './hive/HiveDriver'; -import { Int64 } from './hive/Types'; import DBSQLSession from './DBSQLSession'; import IDBSQLSession from './contracts/IDBSQLSession'; import IAuthentication from './connection/contracts/IAuthentication'; diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index f49e9651..ded6f8bf 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -3,6 +3,7 @@ import * as path from 'path'; import stream from 'node:stream'; import util from 'node:util'; import { stringify, NIL } from 'uuid'; +import Int64 from 'node-int64'; import fetch, { HeadersInit } from 'node-fetch'; import { TSessionHandle, @@ -12,7 +13,6 @@ import { TSparkArrowTypes, TSparkParameter, } from '../thrift/TCLIService_types'; -import { Int64 } from './hive/Types'; import IDBSQLSession, { ExecuteStatementOptions, TypeInfoRequest, diff --git a/lib/contracts/IDBSQLSession.ts b/lib/contracts/IDBSQLSession.ts index ab5509ef..7342cd16 100644 --- a/lib/contracts/IDBSQLSession.ts +++ b/lib/contracts/IDBSQLSession.ts @@ -1,7 +1,7 @@ +import Int64 from 'node-int64'; import IOperation from './IOperation'; import Status from '../dto/Status'; import InfoValue from '../dto/InfoValue'; -import { Int64 } from '../hive/Types'; import { DBSQLParameter, DBSQLParameterValue } from '../DBSQLParameter'; export type ExecuteStatementOptions = { diff --git a/lib/dto/InfoValue.ts b/lib/dto/InfoValue.ts index 4340b495..ec90f20e 100644 --- a/lib/dto/InfoValue.ts +++ b/lib/dto/InfoValue.ts @@ -1,5 +1,5 @@ +import Int64 from 'node-int64'; import { TGetInfoValue } from '../../thrift/TCLIService_types'; -import { Int64 } from '../hive/Types'; type InfoResultType = string | number | Buffer | Int64 | null; diff --git a/lib/hive/Types/index.ts b/lib/hive/Types/index.ts deleted file mode 100644 index e2eb4e5c..00000000 --- a/lib/hive/Types/index.ts +++ /dev/null @@ -1,39 +0,0 @@ -import Int64 from 'node-int64'; -import { - TBoolColumn, - TByteColumn, - TI16Column, - TI32Column, - TI64Column, - TDoubleColumn, - TStringColumn, - TBinaryColumn, -} from '../../../thrift/TCLIService_types'; - -export { Int64 }; - -export enum ColumnCode { - boolVal = 'boolVal', - byteVal = 'byteVal', - i16Val = 'i16Val', - i32Val = 'i32Val', - i64Val = 'i64Val', - doubleVal = 'doubleVal', - stringVal = 'stringVal', - binaryVal = 'binaryVal', -} - -export type ColumnType = - | TBoolColumn - | TByteColumn - | TI16Column - | TI32Column - | TI64Column - | TDoubleColumn - | TStringColumn - | TBinaryColumn; - -export enum FetchType { - Data = 0, - Logs = 1, -} diff --git a/lib/result/JsonResultHandler.ts b/lib/result/JsonResultHandler.ts index 02a084af..2286b6dd 100644 --- a/lib/result/JsonResultHandler.ts +++ b/lib/result/JsonResultHandler.ts @@ -1,8 +1,7 @@ -import { ColumnCode } from '../hive/Types'; import { TGetResultSetMetadataResp, TRowSet, TColumn, TColumnDesc } from '../../thrift/TCLIService_types'; import IClientContext from '../contracts/IClientContext'; import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; -import { getSchemaColumns, convertThriftValue } from './utils'; +import { getSchemaColumns, convertThriftValue, getColumnValue } from './utils'; export default class JsonResultHandler implements IResultsProvider> { private readonly context: IClientContext; @@ -59,7 +58,7 @@ export default class JsonResultHandler implements IResultsProvider> { private getSchemaValues(descriptor: TColumnDesc, column?: TColumn): Array { const typeDescriptor = descriptor.typeDesc.types[0]?.primitiveEntry; - const columnValue = this.getColumnValue(column); + const columnValue = getColumnValue(column); if (!columnValue) { return []; @@ -79,20 +78,4 @@ export default class JsonResultHandler implements IResultsProvider> { return (byte & ofs) !== 0; } - - private getColumnValue(column?: TColumn) { - if (!column) { - return undefined; - } - return ( - column[ColumnCode.binaryVal] || - column[ColumnCode.boolVal] || - column[ColumnCode.byteVal] || - column[ColumnCode.doubleVal] || - column[ColumnCode.i16Val] || - column[ColumnCode.i32Val] || - column[ColumnCode.i64Val] || - column[ColumnCode.stringVal] - ); - } } diff --git a/lib/result/RowSetProvider.ts b/lib/result/RowSetProvider.ts index 5661d208..f3fa4213 100644 --- a/lib/result/RowSetProvider.ts +++ b/lib/result/RowSetProvider.ts @@ -1,14 +1,14 @@ -import { - TColumn, - TFetchOrientation, - TFetchResultsResp, - TOperationHandle, - TRowSet, -} from '../../thrift/TCLIService_types'; -import { ColumnCode, FetchType, Int64 } from '../hive/Types'; +import Int64 from 'node-int64'; +import { TFetchOrientation, TFetchResultsResp, TOperationHandle, TRowSet } from '../../thrift/TCLIService_types'; import Status from '../dto/Status'; import IClientContext from '../contracts/IClientContext'; import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; +import { getColumnValue } from './utils'; + +export enum FetchType { + Data = 0, + Logs = 1, +} function checkIfOperationHasMoreRows(response: TFetchResultsResp): boolean { if (response.hasMoreRows) { @@ -17,23 +17,8 @@ function checkIfOperationHasMoreRows(response: TFetchResultsResp): boolean { const columns = response.results?.columns || []; - if (columns.length === 0) { - return false; - } - - const column: TColumn = columns[0]; - - const columnValue = - column[ColumnCode.binaryVal] || - column[ColumnCode.boolVal] || - column[ColumnCode.byteVal] || - column[ColumnCode.doubleVal] || - column[ColumnCode.i16Val] || - column[ColumnCode.i32Val] || - column[ColumnCode.i64Val] || - column[ColumnCode.stringVal]; - - return (columnValue?.values?.length || 0) > 0; + const columnValue = getColumnValue(columns[0]); + return (columnValue?.values?.length ?? 0) > 0; } export default class RowSetProvider implements IResultsProvider { diff --git a/lib/result/utils.ts b/lib/result/utils.ts index 86edc776..4bbdf418 100644 --- a/lib/result/utils.ts +++ b/lib/result/utils.ts @@ -16,7 +16,7 @@ import { DateUnit, RecordBatchWriter, } from 'apache-arrow'; -import { TTableSchema, TColumnDesc, TPrimitiveTypeEntry, TTypeId } from '../../thrift/TCLIService_types'; +import { TTableSchema, TColumnDesc, TPrimitiveTypeEntry, TTypeId, TColumn } from '../../thrift/TCLIService_types'; import HiveDriverError from '../errors/HiveDriverError'; export interface ArrowBatch { @@ -145,3 +145,19 @@ export function hiveSchemaToArrowSchema(schema?: TTableSchema): Buffer | undefin writer.finish(); return Buffer.from(writer.toUint8Array(true)); } + +export function getColumnValue(column?: TColumn) { + if (!column) { + return undefined; + } + return ( + column.binaryVal ?? + column.boolVal ?? + column.byteVal ?? + column.doubleVal ?? + column.i16Val ?? + column.i32Val ?? + column.i64Val ?? + column.stringVal + ); +} From c5f9037d9fe68f597c62d7e0016e1267ce184880 Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Tue, 23 Apr 2024 13:27:49 +0300 Subject: [PATCH 2/5] Allow any number type (number, bigint, Int64) for `maxRows` and `queryTimeout` options Signed-off-by: Levko Kravets --- lib/DBSQLSession.ts | 21 ++++++++++++++++++--- lib/contracts/IDBSQLSession.ts | 22 +++++++++++----------- tests/unit/DBSQLSession.test.js | 27 ++++++++++++++++++++++++++- 3 files changed, 55 insertions(+), 15 deletions(-) diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index ded6f8bf..6364e5a9 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -49,14 +49,29 @@ interface OperationResponseShape { directResults?: TSparkDirectResults; } -function getDirectResultsOptions(maxRows: number | null = defaultMaxRows) { +export function numberToInt64(value: number | bigint | Int64): Int64 { + if (value instanceof Int64) { + return value; + } + + if (typeof value === 'bigint') { + const buffer = new ArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT); + const view = new DataView(buffer); + view.setBigInt64(0, value, false); // `false` to use big-endian order + return new Int64(Buffer.from(buffer)); + } + + return new Int64(value); +} + +function getDirectResultsOptions(maxRows: number | bigint | Int64 | null = defaultMaxRows) { if (maxRows === null) { return {}; } return { getDirectResults: { - maxRows: new Int64(maxRows), + maxRows: numberToInt64(maxRows), }, }; } @@ -184,7 +199,7 @@ export default class DBSQLSession implements IDBSQLSession { const operationPromise = driver.executeStatement({ sessionHandle: this.sessionHandle, statement, - queryTimeout: options.queryTimeout, + queryTimeout: options.queryTimeout ? numberToInt64(options.queryTimeout) : undefined, runAsync: true, ...getDirectResultsOptions(options.maxRows), ...getArrowOptions(clientConfig), diff --git a/lib/contracts/IDBSQLSession.ts b/lib/contracts/IDBSQLSession.ts index 7342cd16..652e7b75 100644 --- a/lib/contracts/IDBSQLSession.ts +++ b/lib/contracts/IDBSQLSession.ts @@ -5,12 +5,12 @@ import InfoValue from '../dto/InfoValue'; import { DBSQLParameter, DBSQLParameterValue } from '../DBSQLParameter'; export type ExecuteStatementOptions = { - queryTimeout?: Int64; + queryTimeout?: number | bigint | Int64; /** * @deprecated This option is no longer supported and will be removed in future releases */ runAsync?: boolean; - maxRows?: number | null; + maxRows?: number | bigint | Int64 | null; useCloudFetch?: boolean; stagingAllowedLocalPath?: string | string[]; namedParameters?: Record; @@ -22,7 +22,7 @@ export type TypeInfoRequest = { * @deprecated This option is no longer supported and will be removed in future releases */ runAsync?: boolean; - maxRows?: number | null; + maxRows?: number | bigint | Int64 | null; }; export type CatalogsRequest = { @@ -30,7 +30,7 @@ export type CatalogsRequest = { * @deprecated This option is no longer supported and will be removed in future releases */ runAsync?: boolean; - maxRows?: number | null; + maxRows?: number | bigint | Int64 | null; }; export type SchemasRequest = { @@ -40,7 +40,7 @@ export type SchemasRequest = { * @deprecated This option is no longer supported and will be removed in future releases */ runAsync?: boolean; - maxRows?: number | null; + maxRows?: number | bigint | Int64 | null; }; export type TablesRequest = { @@ -52,7 +52,7 @@ export type TablesRequest = { * @deprecated This option is no longer supported and will be removed in future releases */ runAsync?: boolean; - maxRows?: number | null; + maxRows?: number | bigint | Int64 | null; }; export type TableTypesRequest = { @@ -60,7 +60,7 @@ export type TableTypesRequest = { * @deprecated This option is no longer supported and will be removed in future releases */ runAsync?: boolean; - maxRows?: number | null; + maxRows?: number | bigint | Int64 | null; }; export type ColumnsRequest = { @@ -72,7 +72,7 @@ export type ColumnsRequest = { * @deprecated This option is no longer supported and will be removed in future releases */ runAsync?: boolean; - maxRows?: number | null; + maxRows?: number | bigint | Int64 | null; }; export type FunctionsRequest = { @@ -83,7 +83,7 @@ export type FunctionsRequest = { * @deprecated This option is no longer supported and will be removed in future releases */ runAsync?: boolean; - maxRows?: number | null; + maxRows?: number | bigint | Int64 | null; }; export type PrimaryKeysRequest = { @@ -94,7 +94,7 @@ export type PrimaryKeysRequest = { * @deprecated This option is no longer supported and will be removed in future releases */ runAsync?: boolean; - maxRows?: number | null; + maxRows?: number | bigint | Int64 | null; }; export type CrossReferenceRequest = { @@ -108,7 +108,7 @@ export type CrossReferenceRequest = { * @deprecated This option is no longer supported and will be removed in future releases */ runAsync?: boolean; - maxRows?: number | null; + maxRows?: number | bigint | Int64 | null; }; export default interface IDBSQLSession { diff --git a/tests/unit/DBSQLSession.test.js b/tests/unit/DBSQLSession.test.js index b172b29f..9caac08a 100644 --- a/tests/unit/DBSQLSession.test.js +++ b/tests/unit/DBSQLSession.test.js @@ -1,7 +1,8 @@ const { expect, AssertionError } = require('chai'); const { DBSQLLogger, LogLevel } = require('../../dist'); const sinon = require('sinon'); -const DBSQLSession = require('../../dist/DBSQLSession').default; +const Int64 = require('node-int64'); +const { default: DBSQLSession, numberToInt64 } = require('../../dist/DBSQLSession'); const InfoValue = require('../../dist/dto/InfoValue').default; const Status = require('../../dist/dto/Status').default; const DBSQLOperation = require('../../dist/DBSQLOperation').default; @@ -62,6 +63,30 @@ async function expectFailure(fn) { } describe('DBSQLSession', () => { + describe('numberToInt64', () => { + it('should convert regular number to Int64', () => { + const num = Math.random() * 1000000; + const value = numberToInt64(num); + expect(value.equals(new Int64(num))).to.be.true; + }); + + it('should return Int64 values as is', () => { + const num = new Int64(Math.random() * 1000000); + const value = numberToInt64(num); + expect(value).to.equal(num); + }); + + it('should convert BigInt to Int64', () => { + // This case is especially important, because Int64 has no native methods to convert + // between Int64 and BigInt. This conversion involves some byte operations, and it's + // important to make sure we don't mess up with things like byte order + + const num = BigInt(Math.round(Math.random() * 10000)) * BigInt(Math.round(Math.random() * 10000)); + const value = numberToInt64(num); + expect(value.toString()).equal(num.toString()); + }); + }); + describe('getInfo', () => { it('should run operation', async () => { const session = createSession(); From 2e6c09487716fa47e463f0fee63b3bcc408e024a Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Tue, 23 Apr 2024 13:45:55 +0300 Subject: [PATCH 3/5] Move some global constants to client config Signed-off-by: Levko Kravets --- lib/DBSQLClient.ts | 3 +++ lib/DBSQLOperation.ts | 8 +++---- lib/DBSQLSession.ts | 38 ++++++++++++++++++------------- lib/contracts/IClientContext.ts | 3 +++ tests/unit/DBSQLOperation.test.js | 5 ++++ 5 files changed, 37 insertions(+), 20 deletions(-) diff --git a/lib/DBSQLClient.ts b/lib/DBSQLClient.ts index 4b4ecd68..de9d9114 100644 --- a/lib/DBSQLClient.ts +++ b/lib/DBSQLClient.ts @@ -73,6 +73,9 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I private static getDefaultConfig(): ClientConfig { return { + directResultsDefaultMaxRows: 100000, + fetchChunkDefaultMaxRows: 100000, + arrowEnabled: true, useArrowNativeTypes: true, socketTimeout: 15 * 60 * 1000, // 15 minutes diff --git a/lib/DBSQLOperation.ts b/lib/DBSQLOperation.ts index 0e81fa84..91b3bdf8 100644 --- a/lib/DBSQLOperation.ts +++ b/lib/DBSQLOperation.ts @@ -29,8 +29,6 @@ import { definedOrError } from './utils'; import HiveDriverError from './errors/HiveDriverError'; import IClientContext from './contracts/IClientContext'; -const defaultMaxRows = 100000; - interface DBSQLOperationConstructorOptions { handle: TOperationHandle; directResults?: TSparkDirectResults; @@ -164,8 +162,10 @@ export default class DBSQLOperation implements IOperation { setTimeout(resolve, 0); }); + const defaultMaxRows = this.context.getConfig().fetchChunkDefaultMaxRows; + const result = resultHandler.fetchNext({ - limit: options?.maxRows || defaultMaxRows, + limit: options?.maxRows ?? defaultMaxRows, disableBuffering: options?.disableBuffering, }); await this.failIfClosed(); @@ -174,7 +174,7 @@ export default class DBSQLOperation implements IOperation { .getLogger() .log( LogLevel.debug, - `Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.id}`, + `Fetched chunk of size: ${options?.maxRows ?? defaultMaxRows} from operation with id: ${this.id}`, ); return result; } diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index 6364e5a9..768b6b5e 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -41,8 +41,6 @@ import IClientContext, { ClientConfig } from './contracts/IClientContext'; // Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14 const pipeline = util.promisify(stream.pipeline); -const defaultMaxRows = 100000; - interface OperationResponseShape { status: TStatus; operationHandle?: TOperationHandle; @@ -64,14 +62,14 @@ export function numberToInt64(value: number | bigint | Int64): Int64 { return new Int64(value); } -function getDirectResultsOptions(maxRows: number | bigint | Int64 | null = defaultMaxRows) { +function getDirectResultsOptions(maxRows: number | bigint | Int64 | null | undefined, config: ClientConfig) { if (maxRows === null) { return {}; } return { getDirectResults: { - maxRows: numberToInt64(maxRows), + maxRows: numberToInt64(maxRows ?? config.directResultsDefaultMaxRows), }, }; } @@ -101,7 +99,6 @@ function getArrowOptions(config: ClientConfig): { } function getQueryParameters( - sessionHandle: TSessionHandle, namedParameters?: Record, ordinalParameters?: Array, ): Array { @@ -201,10 +198,10 @@ export default class DBSQLSession implements IDBSQLSession { statement, queryTimeout: options.queryTimeout ? numberToInt64(options.queryTimeout) : undefined, runAsync: true, - ...getDirectResultsOptions(options.maxRows), + ...getDirectResultsOptions(options.maxRows, clientConfig), ...getArrowOptions(clientConfig), canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch, - parameters: getQueryParameters(this.sessionHandle, options.namedParameters, options.ordinalParameters), + parameters: getQueryParameters(options.namedParameters, options.ordinalParameters), canDecompressLZ4Result: clientConfig.useLZ4Compression && Boolean(LZ4), }); const response = await this.handleResponse(operationPromise); @@ -354,10 +351,11 @@ export default class DBSQLSession implements IDBSQLSession { public async getTypeInfo(request: TypeInfoRequest = {}): Promise { await this.failIfClosed(); const driver = await this.context.getDriver(); + const clientConfig = this.context.getConfig(); const operationPromise = driver.getTypeInfo({ sessionHandle: this.sessionHandle, runAsync: true, - ...getDirectResultsOptions(request.maxRows), + ...getDirectResultsOptions(request.maxRows, clientConfig), }); const response = await this.handleResponse(operationPromise); return this.createOperation(response); @@ -372,10 +370,11 @@ export default class DBSQLSession implements IDBSQLSession { public async getCatalogs(request: CatalogsRequest = {}): Promise { await this.failIfClosed(); const driver = await this.context.getDriver(); + const clientConfig = this.context.getConfig(); const operationPromise = driver.getCatalogs({ sessionHandle: this.sessionHandle, runAsync: true, - ...getDirectResultsOptions(request.maxRows), + ...getDirectResultsOptions(request.maxRows, clientConfig), }); const response = await this.handleResponse(operationPromise); return this.createOperation(response); @@ -390,12 +389,13 @@ export default class DBSQLSession implements IDBSQLSession { public async getSchemas(request: SchemasRequest = {}): Promise { await this.failIfClosed(); const driver = await this.context.getDriver(); + const clientConfig = this.context.getConfig(); const operationPromise = driver.getSchemas({ sessionHandle: this.sessionHandle, catalogName: request.catalogName, schemaName: request.schemaName, runAsync: true, - ...getDirectResultsOptions(request.maxRows), + ...getDirectResultsOptions(request.maxRows, clientConfig), }); const response = await this.handleResponse(operationPromise); return this.createOperation(response); @@ -410,6 +410,7 @@ export default class DBSQLSession implements IDBSQLSession { public async getTables(request: TablesRequest = {}): Promise { await this.failIfClosed(); const driver = await this.context.getDriver(); + const clientConfig = this.context.getConfig(); const operationPromise = driver.getTables({ sessionHandle: this.sessionHandle, catalogName: request.catalogName, @@ -417,7 +418,7 @@ export default class DBSQLSession implements IDBSQLSession { tableName: request.tableName, tableTypes: request.tableTypes, runAsync: true, - ...getDirectResultsOptions(request.maxRows), + ...getDirectResultsOptions(request.maxRows, clientConfig), }); const response = await this.handleResponse(operationPromise); return this.createOperation(response); @@ -432,10 +433,11 @@ export default class DBSQLSession implements IDBSQLSession { public async getTableTypes(request: TableTypesRequest = {}): Promise { await this.failIfClosed(); const driver = await this.context.getDriver(); + const clientConfig = this.context.getConfig(); const operationPromise = driver.getTableTypes({ sessionHandle: this.sessionHandle, runAsync: true, - ...getDirectResultsOptions(request.maxRows), + ...getDirectResultsOptions(request.maxRows, clientConfig), }); const response = await this.handleResponse(operationPromise); return this.createOperation(response); @@ -450,6 +452,7 @@ export default class DBSQLSession implements IDBSQLSession { public async getColumns(request: ColumnsRequest = {}): Promise { await this.failIfClosed(); const driver = await this.context.getDriver(); + const clientConfig = this.context.getConfig(); const operationPromise = driver.getColumns({ sessionHandle: this.sessionHandle, catalogName: request.catalogName, @@ -457,7 +460,7 @@ export default class DBSQLSession implements IDBSQLSession { tableName: request.tableName, columnName: request.columnName, runAsync: true, - ...getDirectResultsOptions(request.maxRows), + ...getDirectResultsOptions(request.maxRows, clientConfig), }); const response = await this.handleResponse(operationPromise); return this.createOperation(response); @@ -472,13 +475,14 @@ export default class DBSQLSession implements IDBSQLSession { public async getFunctions(request: FunctionsRequest): Promise { await this.failIfClosed(); const driver = await this.context.getDriver(); + const clientConfig = this.context.getConfig(); const operationPromise = driver.getFunctions({ sessionHandle: this.sessionHandle, catalogName: request.catalogName, schemaName: request.schemaName, functionName: request.functionName, runAsync: true, - ...getDirectResultsOptions(request.maxRows), + ...getDirectResultsOptions(request.maxRows, clientConfig), }); const response = await this.handleResponse(operationPromise); return this.createOperation(response); @@ -487,13 +491,14 @@ export default class DBSQLSession implements IDBSQLSession { public async getPrimaryKeys(request: PrimaryKeysRequest): Promise { await this.failIfClosed(); const driver = await this.context.getDriver(); + const clientConfig = this.context.getConfig(); const operationPromise = driver.getPrimaryKeys({ sessionHandle: this.sessionHandle, catalogName: request.catalogName, schemaName: request.schemaName, tableName: request.tableName, runAsync: true, - ...getDirectResultsOptions(request.maxRows), + ...getDirectResultsOptions(request.maxRows, clientConfig), }); const response = await this.handleResponse(operationPromise); return this.createOperation(response); @@ -508,6 +513,7 @@ export default class DBSQLSession implements IDBSQLSession { public async getCrossReference(request: CrossReferenceRequest): Promise { await this.failIfClosed(); const driver = await this.context.getDriver(); + const clientConfig = this.context.getConfig(); const operationPromise = driver.getCrossReference({ sessionHandle: this.sessionHandle, parentCatalogName: request.parentCatalogName, @@ -517,7 +523,7 @@ export default class DBSQLSession implements IDBSQLSession { foreignSchemaName: request.foreignSchemaName, foreignTableName: request.foreignTableName, runAsync: true, - ...getDirectResultsOptions(request.maxRows), + ...getDirectResultsOptions(request.maxRows, clientConfig), }); const response = await this.handleResponse(operationPromise); return this.createOperation(response); diff --git a/lib/contracts/IClientContext.ts b/lib/contracts/IClientContext.ts index 712b845f..46c46c4b 100644 --- a/lib/contracts/IClientContext.ts +++ b/lib/contracts/IClientContext.ts @@ -4,6 +4,9 @@ import IConnectionProvider from '../connection/contracts/IConnectionProvider'; import TCLIService from '../../thrift/TCLIService'; export interface ClientConfig { + directResultsDefaultMaxRows: number; + fetchChunkDefaultMaxRows: number; + arrowEnabled?: boolean; useArrowNativeTypes?: boolean; socketTimeout: number; diff --git a/tests/unit/DBSQLOperation.test.js b/tests/unit/DBSQLOperation.test.js index 99cc1e66..e96eae43 100644 --- a/tests/unit/DBSQLOperation.test.js +++ b/tests/unit/DBSQLOperation.test.js @@ -2,6 +2,7 @@ const { expect, AssertionError } = require('chai'); const sinon = require('sinon'); const { DBSQLLogger, LogLevel } = require('../../dist'); const { TStatusCode, TOperationState, TTypeId, TSparkRowSetType } = require('../../thrift/TCLIService_types'); +const DBSQLClient = require('../../dist/DBSQLClient').default; const DBSQLOperation = require('../../dist/DBSQLOperation').default; const StatusError = require('../../dist/errors/StatusError').default; const OperationStateError = require('../../dist/errors/OperationStateError').default; @@ -109,6 +110,10 @@ class ClientContextMock { this.driver = new DriverMock(); } + getConfig() { + return DBSQLClient.getDefaultConfig(); + } + getLogger() { return this.logger; } From ddfbb09ad3a8f6174af89ab14f105f0a2d297946 Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Tue, 23 Apr 2024 13:57:30 +0300 Subject: [PATCH 4/5] Add a note about `queryTimeout` option Signed-off-by: Levko Kravets --- lib/contracts/IDBSQLSession.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/contracts/IDBSQLSession.ts b/lib/contracts/IDBSQLSession.ts index 652e7b75..8a0d8bf0 100644 --- a/lib/contracts/IDBSQLSession.ts +++ b/lib/contracts/IDBSQLSession.ts @@ -5,6 +5,11 @@ import InfoValue from '../dto/InfoValue'; import { DBSQLParameter, DBSQLParameterValue } from '../DBSQLParameter'; export type ExecuteStatementOptions = { + /** + * The number of seconds after which the query will time out on the server. + * Effective only with Compute clusters. For SQL Warehouses, `STATEMENT_TIMEOUT` + * configuration should be used + */ queryTimeout?: number | bigint | Int64; /** * @deprecated This option is no longer supported and will be removed in future releases From 7eb6409fcccbdee45a9262f87e242c88073b9cc0 Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Tue, 23 Apr 2024 21:58:28 +0300 Subject: [PATCH 5/5] Use a proper error class instead of a generic `Error` Signed-off-by: Levko Kravets --- .../auth/DatabricksOAuth/AuthorizationCode.ts | 7 ++++--- lib/connection/auth/DatabricksOAuth/OAuthManager.ts | 12 ++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/lib/connection/auth/DatabricksOAuth/AuthorizationCode.ts b/lib/connection/auth/DatabricksOAuth/AuthorizationCode.ts index 03845c74..f6973a4d 100644 --- a/lib/connection/auth/DatabricksOAuth/AuthorizationCode.ts +++ b/lib/connection/auth/DatabricksOAuth/AuthorizationCode.ts @@ -4,6 +4,7 @@ import open from 'open'; import { LogLevel } from '../../../contracts/IDBSQLLogger'; import { OAuthScopes, scopeDelimiter } from './OAuthScope'; import IClientContext from '../../../contracts/IClientContext'; +import AuthenticationError from '../../../errors/AuthenticationError'; export interface AuthorizationCodeOptions { client: BaseClient; @@ -113,9 +114,9 @@ export default class AuthorizationCode { if (!receivedParams || !receivedParams.code) { if (receivedParams?.error) { const errorMessage = `OAuth error: ${receivedParams.error} ${receivedParams.error_description}`; - throw new Error(errorMessage); + throw new AuthenticationError(errorMessage); } - throw new Error(`No path parameters were returned to the callback at ${redirectUri}`); + throw new AuthenticationError(`No path parameters were returned to the callback at ${redirectUri}`); } return { code: receivedParams.code, verifier: verifierString, redirectUri }; @@ -152,7 +153,7 @@ export default class AuthorizationCode { } } - throw new Error('Failed to start server: all ports are in use'); + throw new AuthenticationError('Failed to start server: all ports are in use'); } private renderCallbackResponse(): string { diff --git a/lib/connection/auth/DatabricksOAuth/OAuthManager.ts b/lib/connection/auth/DatabricksOAuth/OAuthManager.ts index c1c41345..02bbde15 100644 --- a/lib/connection/auth/DatabricksOAuth/OAuthManager.ts +++ b/lib/connection/auth/DatabricksOAuth/OAuthManager.ts @@ -1,6 +1,6 @@ import http from 'http'; import { Issuer, BaseClient, custom } from 'openid-client'; -import HiveDriverError from '../../../errors/HiveDriverError'; +import AuthenticationError from '../../../errors/AuthenticationError'; import { LogLevel } from '../../../contracts/IDBSQLLogger'; import OAuthToken from './OAuthToken'; import AuthorizationCode from './AuthorizationCode'; @@ -104,7 +104,7 @@ export default abstract class OAuthManager { if (!token.refreshToken) { const message = `OAuth access token expired on ${token.expirationTime}.`; this.context.getLogger().log(LogLevel.error, message); - throw new HiveDriverError(message); + throw new AuthenticationError(message); } // Try to refresh using the refresh token @@ -115,7 +115,7 @@ export default abstract class OAuthManager { const client = await this.getClient(); const { access_token: accessToken, refresh_token: refreshToken } = await client.refresh(token.refreshToken); if (!accessToken || !refreshToken) { - throw new Error('Failed to refresh token: invalid response'); + throw new AuthenticationError('Failed to refresh token: invalid response'); } return new OAuthToken(accessToken, refreshToken, token.scopes); } @@ -165,7 +165,7 @@ export default abstract class OAuthManager { }); if (!accessToken) { - throw new Error('Failed to fetch access token'); + throw new AuthenticationError('Failed to fetch access token'); } return new OAuthToken(accessToken, refreshToken, mappedScopes); } @@ -185,7 +185,7 @@ export default abstract class OAuthManager { }); if (!accessToken) { - throw new Error('Failed to fetch access token'); + throw new AuthenticationError('Failed to fetch access token'); } return new OAuthToken(accessToken, undefined, mappedScopes); } @@ -234,7 +234,7 @@ export default abstract class OAuthManager { } } - throw new Error(`OAuth is not supported for ${options.host}`); + throw new AuthenticationError(`OAuth is not supported for ${options.host}`); } }