Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions lib/DBSQLOperation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import IOperation, {
FinishedOptions,
GetSchemaOptions,
WaitUntilReadyOptions,
IteratorOptions,
IOperationChunksIterator,
IOperationRowsIterator,
} from './contracts/IOperation';
import {
TGetOperationStatusResp,
Expand All @@ -26,6 +29,7 @@ import CloudFetchResultHandler from './result/CloudFetchResultHandler';
import ArrowResultConverter from './result/ArrowResultConverter';
import ResultSlicer from './result/ResultSlicer';
import { definedOrError } from './utils';
import { OperationChunksIterator, OperationRowsIterator } from './utils/OperationIterator';
import HiveDriverError from './errors/HiveDriverError';
import IClientContext from './contracts/IClientContext';

Expand Down Expand Up @@ -89,6 +93,14 @@ export default class DBSQLOperation implements IOperation {
this.context.getLogger().log(LogLevel.debug, `Operation created with id: ${this.id}`);
}

public iterateChunks(options?: IteratorOptions): IOperationChunksIterator {
return new OperationChunksIterator(this, options);
}

public iterateRows(options?: IteratorOptions): IOperationRowsIterator {
return new OperationRowsIterator(this, options);
}

public get id() {
const operationId = this.operationHandle?.operationId?.guid;
return operationId ? stringify(operationId) : NIL;
Expand Down
16 changes: 16 additions & 0 deletions lib/contracts/IOperation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ export interface GetSchemaOptions extends WaitUntilReadyOptions {
// no other options
}

export interface IteratorOptions extends FetchOptions {
autoClose?: boolean; // defaults to `false`
}

export interface IOperationChunksIterator extends AsyncIterableIterator<Array<object>> {
readonly operation: IOperation;
}

export interface IOperationRowsIterator extends AsyncIterableIterator<object> {
readonly operation: IOperation;
}

export default interface IOperation {
/**
* Operation identifier
Expand Down Expand Up @@ -70,4 +82,8 @@ export default interface IOperation {
* Fetch schema
*/
getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null>;

iterateChunks(options?: IteratorOptions): IOperationChunksIterator;

iterateRows(options?: IteratorOptions): IOperationRowsIterator;
}
85 changes: 85 additions & 0 deletions lib/utils/OperationIterator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import IOperation, { IOperationChunksIterator, IOperationRowsIterator, IteratorOptions } from '../contracts/IOperation';

abstract class OperationIterator<R> implements AsyncIterableIterator<R> {
public readonly operation: IOperation;

protected readonly options?: IteratorOptions;

constructor(operation: IOperation, options?: IteratorOptions) {
this.operation = operation;
this.options = options;
}

protected abstract getNext(): Promise<IteratorResult<R>>;

public [Symbol.asyncIterator]() {
return this;
}

public async next() {
const result = await this.getNext();

if (result.done && this.options?.autoClose) {
await this.operation.close();
}

return result;
}

// This method is intended for a cleanup when the caller does not intend to make any more
// reads from iterator (e.g. when using `break` in a `for ... of` loop)
public async return(value?: any) {
if (this.options?.autoClose) {
await this.operation.close();
}

return { done: true, value };
}
}

export class OperationChunksIterator extends OperationIterator<Array<object>> implements IOperationChunksIterator {
protected async getNext(): Promise<IteratorResult<Array<object>>> {
const hasMoreRows = await this.operation.hasMoreRows();
if (hasMoreRows) {
const value = await this.operation.fetchChunk(this.options);
return { done: false, value };
}

return { done: true, value: undefined };
}
}

export class OperationRowsIterator extends OperationIterator<object> implements IOperationRowsIterator {
private chunk: Array<object> = [];

private index: number = 0;

constructor(operation: IOperation, options?: IteratorOptions) {
super(operation, {
...options,
// Tell slicer to return raw chunks. We're going to process rows one by one anyway,
// so no need to additionally buffer and slice chunks returned by server
disableBuffering: true,
});
}

protected async getNext(): Promise<IteratorResult<object>> {
if (this.index < this.chunk.length) {
const value = this.chunk[this.index];
this.index += 1;
return { done: false, value };
}

const hasMoreRows = await this.operation.hasMoreRows();
if (hasMoreRows) {
this.chunk = await this.operation.fetchChunk(this.options);
this.index = 0;
// Note: this call is not really a recursion. Since this method is
// async - the call will be actually scheduled for processing on
// the next event loop cycle
return this.getNext();
}

return { done: true, value: undefined };
}
}
87 changes: 87 additions & 0 deletions tests/e2e/iterators.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
const { expect } = require('chai');
const sinon = require('sinon');
const config = require('./utils/config');
const { DBSQLClient } = require('../../lib');

async function openSession(customConfig) {
const client = new DBSQLClient();

const clientConfig = client.getConfig();
sinon.stub(client, 'getConfig').returns({
...clientConfig,
...customConfig,
});

const connection = await client.connect({
host: config.host,
path: config.path,
token: config.token,
});

return connection.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
}

function arrayChunks(arr, chunkSize) {
const result = [];

while (arr.length > 0) {
const chunk = arr.splice(0, chunkSize);
result.push(chunk);
}

return result;
}

describe('Iterators', () => {
it('should iterate over all chunks', async () => {
const session = await openSession({ arrowEnabled: false });
sinon.spy(session.context.driver, 'fetchResults');
try {
const expectedRowsCount = 10;

// set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults`
const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`, {
maxRows: null,
});

const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id }));
const chunkSize = 4;
const expectedChunks = arrayChunks(expectedRows, chunkSize);

let index = 0;
for await (const chunk of operation.iterateChunks({ maxRows: chunkSize })) {
expect(chunk).to.deep.equal(expectedChunks[index]);
index += 1;
}

expect(index).to.equal(expectedChunks.length);
} finally {
await session.close();
}
});

it('should iterate over all rows', async () => {
const session = await openSession({ arrowEnabled: false });
sinon.spy(session.context.driver, 'fetchResults');
try {
const expectedRowsCount = 10;

const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`);

const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id }));

let index = 0;
for await (const row of operation.iterateRows()) {
expect(row).to.deep.equal(expectedRows[index]);
index += 1;
}

expect(index).to.equal(expectedRows.length);
} finally {
await session.close();
}
});
});
Original file line number Diff line number Diff line change
@@ -1,97 +1,5 @@
const { expect, AssertionError } = require('chai');

const { buildUserAgentString, definedOrError, formatProgress, ProgressUpdateTransformer } = require('../../lib/utils');
const CloseableCollection = require('../../lib/utils/CloseableCollection').default;

describe('buildUserAgentString', () => {
// It should follow https://www.rfc-editor.org/rfc/rfc7231#section-5.5.3 and
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent
//
// UserAgent ::= <ProductName> '/' <ProductVersion> '(' <Comment> ')'
// ProductName ::= 'NodejsDatabricksSqlConnector'
// <Comment> ::= [ <ClientId> ';' ] 'Node.js' <NodeJsVersion> ';' <OSPlatform> <OSVersion>
//
// Examples:
// - with <ClientId> provided: NodejsDatabricksSqlConnector/0.1.8-beta.1 (Client ID; Node.js 16.13.1; Darwin 21.5.0)
// - without <ClientId> provided: NodejsDatabricksSqlConnector/0.1.8-beta.1 (Node.js 16.13.1; Darwin 21.5.0)

function checkUserAgentString(ua, clientId) {
// Prefix: 'NodejsDatabricksSqlConnector/'
// Version: three period-separated digits and optional suffix
const re =
/^(?<productName>NodejsDatabricksSqlConnector)\/(?<productVersion>\d+\.\d+\.\d+(-[^(]+)?)\s*\((?<comment>[^)]+)\)$/i;
const match = re.exec(ua);
expect(match).to.not.be.eq(null);

const { comment } = match.groups;

expect(comment.split(';').length).to.be.gte(2); // at least Node and OS version should be there

if (clientId) {
expect(comment.trim()).to.satisfy((s) => s.startsWith(`${clientId};`));
}
}

it('matches pattern with clientId', () => {
const clientId = 'Some Client ID';
const ua = buildUserAgentString(clientId);
checkUserAgentString(ua, clientId);
});

it('matches pattern without clientId', () => {
const ua = buildUserAgentString();
checkUserAgentString(ua);
});
});

describe('formatProgress', () => {
it('formats progress', () => {
const result = formatProgress({
headerNames: [],
rows: [],
});
expect(result).to.be.eq('\n');
});
});

describe('ProgressUpdateTransformer', () => {
it('should have equal columns', () => {
const t = new ProgressUpdateTransformer();

expect(t.formatRow(['Column 1', 'Column 2'])).to.be.eq('Column 1 |Column 2 ');
});

it('should format response as table', () => {
const t = new ProgressUpdateTransformer({
headerNames: ['Column 1', 'Column 2'],
rows: [
['value 1.1', 'value 1.2'],
['value 2.1', 'value 2.2'],
],
footerSummary: 'footer',
});

expect(String(t)).to.be.eq(
'Column 1 |Column 2 \n' + 'value 1.1 |value 1.2 \n' + 'value 2.1 |value 2.2 \n' + 'footer',
);
});
});

describe('definedOrError', () => {
it('should return value if it is defined', () => {
const values = [null, 0, 3.14, false, true, '', 'Hello, World!', [], {}];
for (const value of values) {
const result = definedOrError(value);
expect(result).to.be.equal(value);
}
});

it('should throw error if value is undefined', () => {
expect(() => {
definedOrError(undefined);
}).to.throw();
});
});
const CloseableCollection = require('../../../lib/utils/CloseableCollection').default;

describe('CloseableCollection', () => {
it('should add item if not already added', () => {
Expand Down
Loading