From afe35cd66352a08eeeb3408f32c268e5058bbf47 Mon Sep 17 00:00:00 2001 From: Alexander Fenster Date: Mon, 22 Jan 2018 08:12:03 -0800 Subject: [PATCH 1/4] feature: batch queries --- protos/google/spanner/v1/spanner.proto | 170 +++++++++++- samples/batch.js | 246 ++++++++++++++++++ samples/system-test/spanner.test.js | 75 ++++++ src/batch-transaction.js | 343 +++++++++++++++++++++++++ src/codec.js | 51 ++++ src/database.js | 205 +++++++++++++++ src/index.js | 2 +- src/transaction-request.js | 41 +-- src/v1/spanner_client.js | 233 ++++++++++++++++- src/v1/spanner_client_config.json | 10 + test/batch-transaction.js | 336 ++++++++++++++++++++++++ test/codec.js | 189 ++++++++++++++ test/database.js | 185 +++++++++++++ test/index.js | 11 +- test/transaction-request.js | 267 ++----------------- 15 files changed, 2064 insertions(+), 300 deletions(-) create mode 100644 samples/batch.js create mode 100644 src/batch-transaction.js create mode 100644 test/batch-transaction.js diff --git a/protos/google/spanner/v1/spanner.proto b/protos/google/spanner/v1/spanner.proto index 0e51e8275..aeadeb25d 100644 --- a/protos/google/spanner/v1/spanner.proto +++ b/protos/google/spanner/v1/spanner.proto @@ -1,4 +1,4 @@ -// Copyright 2017 Google Inc. +// Copyright 2018 Google Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ option go_package = "google.golang.org/genproto/googleapis/spanner/v1;spanner"; option java_multiple_files = true; option java_outer_classname = "SpannerProto"; option java_package = "com.google.spanner.v1"; +option php_namespace = "Google\\Cloud\\Spanner\\V1"; // Cloud Spanner API @@ -159,6 +160,30 @@ service Spanner { rpc Rollback(RollbackRequest) returns (google.protobuf.Empty) { option (google.api.http) = { post: "/v1/{session=projects/*/instances/*/databases/*/sessions/*}:rollback" body: "*" }; } + + // Creates a set of partition tokens that can be used to execute a query + // operation in parallel. Each of the returned partition tokens can be used + // by [ExecuteStreamingSql][google.spanner.v1.Spanner.ExecuteStreamingSql] to specify a subset + // of the query result to read. The same session and read-only transaction + // must be used by the PartitionQueryRequest used to create the + // partition tokens and the ExecuteSqlRequests that use the partition tokens. + // Partition tokens become invalid when the session used to create them + // is deleted or begins a new transaction. + rpc PartitionQuery(PartitionQueryRequest) returns (PartitionResponse) { + option (google.api.http) = { post: "/v1/{session=projects/*/instances/*/databases/*/sessions/*}:partitionQuery" body: "*" }; + } + + // Creates a set of partition tokens that can be used to execute a read + // operation in parallel. Each of the returned partition tokens can be used + // by [StreamingRead][google.spanner.v1.Spanner.StreamingRead] to specify a subset of the read + // result to read. The same session and read-only transaction must be used by + // the PartitionReadRequest used to create the partition tokens and the + // ReadRequests that use the partition tokens. + // Partition tokens become invalid when the session used to create them + // is deleted or begins a new transaction. + rpc PartitionRead(PartitionReadRequest) returns (PartitionResponse) { + option (google.api.http) = { post: "/v1/{session=projects/*/instances/*/databases/*/sessions/*}:partitionRead" body: "*" }; + } } // The request for [CreateSession][google.spanner.v1.Spanner.CreateSession]. @@ -172,7 +197,8 @@ message CreateSessionRequest { // A session in the Cloud Spanner API. message Session { - // The name of the session. + // The name of the session. This is always system-assigned; values provided + // when creating a session are ignored. string name = 1; // The labels for the session. @@ -307,8 +333,131 @@ message ExecuteSqlRequest { bytes resume_token = 6; // Used to control the amount of debugging information returned in - // [ResultSetStats][google.spanner.v1.ResultSetStats]. + // [ResultSetStats][google.spanner.v1.ResultSetStats]. If [partition_token][google.spanner.v1.ExecuteSqlRequest.partition_token] is set, [query_mode][google.spanner.v1.ExecuteSqlRequest.query_mode] can only + // be set to [QueryMode.NORMAL][google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL]. QueryMode query_mode = 7; + + // If present, results will be restricted to the specified partition + // previously created using PartitionQuery(). There must be an exact + // match for the values of fields common to this message and the + // PartitionQueryRequest message used to create this partition_token. + bytes partition_token = 8; +} + +// Options for a PartitionQueryRequest and +// PartitionReadRequest. +message PartitionOptions { + // The desired data size for each partition generated. The default for this + // option is currently 1 GiB. This is only a hint. The actual size of each + // partition may be smaller or larger than this size request. + int64 partition_size_bytes = 1; + + // The desired maximum number of partitions to return. For example, this may + // be set to the number of workers available. The default for this option + // is currently 10,000. The maximum value is currently 200,000. This is only + // a hint. The actual number of partitions returned may be smaller or larger + // than this maximum count request. + int64 max_partitions = 2; +} + +// The request for [PartitionQuery][google.spanner.v1.Spanner.PartitionQuery] +message PartitionQueryRequest { + // Required. The session used to create the partitions. + string session = 1; + + // Read only snapshot transactions are supported, read/write and single use + // transactions are not. + TransactionSelector transaction = 2; + + // The query request to generate partitions for. The request will fail if + // the query is not root partitionable. The query plan of a root + // partitionable query has a single distributed union operator. A distributed + // union operator conceptually divides one or more tables into multiple + // splits, remotely evaluates a subquery independently on each split, and + // then unions all results. + string sql = 3; + + // The SQL query string can contain parameter placeholders. A parameter + // placeholder consists of `'@'` followed by the parameter + // name. Parameter names consist of any combination of letters, + // numbers, and underscores. + // + // Parameters can appear anywhere that a literal value is expected. The same + // parameter name can be used more than once, for example: + // `"WHERE id > @msg_id AND id < @msg_id + 100"` + // + // It is an error to execute an SQL query with unbound parameters. + // + // Parameter values are specified using `params`, which is a JSON + // object whose keys are parameter names, and whose values are the + // corresponding parameter values. + google.protobuf.Struct params = 4; + + // It is not always possible for Cloud Spanner to infer the right SQL type + // from a JSON value. For example, values of type `BYTES` and values + // of type `STRING` both appear in [params][google.spanner.v1.PartitionQueryRequest.params] as JSON strings. + // + // In these cases, `param_types` can be used to specify the exact + // SQL type for some or all of the SQL query parameters. See the + // definition of [Type][google.spanner.v1.Type] for more information + // about SQL types. + map param_types = 5; + + // Additional options that affect how many partitions are created. + PartitionOptions partition_options = 6; +} + +// The request for [PartitionRead][google.spanner.v1.Spanner.PartitionRead] +message PartitionReadRequest { + // Required. The session used to create the partitions. + string session = 1; + + // Read only snapshot transactions are supported, read/write and single use + // transactions are not. + TransactionSelector transaction = 2; + + // Required. The name of the table in the database to be read. + string table = 3; + + // If non-empty, the name of an index on [table][google.spanner.v1.PartitionReadRequest.table]. This index is + // used instead of the table primary key when interpreting [key_set][google.spanner.v1.PartitionReadRequest.key_set] + // and sorting result rows. See [key_set][google.spanner.v1.PartitionReadRequest.key_set] for further information. + string index = 4; + + // The columns of [table][google.spanner.v1.PartitionReadRequest.table] to be returned for each row matching + // this request. + repeated string columns = 5; + + // Required. `key_set` identifies the rows to be yielded. `key_set` names the + // primary keys of the rows in [table][google.spanner.v1.PartitionReadRequest.table] to be yielded, unless [index][google.spanner.v1.PartitionReadRequest.index] + // is present. If [index][google.spanner.v1.PartitionReadRequest.index] is present, then [key_set][google.spanner.v1.PartitionReadRequest.key_set] instead names + // index keys in [index][google.spanner.v1.PartitionReadRequest.index]. + // + // It is not an error for the `key_set` to name rows that do not + // exist in the database. Read yields nothing for nonexistent rows. + KeySet key_set = 6; + + // Additional options that affect how many partitions are created. + PartitionOptions partition_options = 9; +} + +// Information returned for each partition returned in a +// PartitionResponse. +message Partition { + // This token can be passed to Read, StreamingRead, ExecuteSql, or + // ExecuteStreamingSql requests to restrict the results to those identified by + // this partition token. + bytes partition_token = 1; +} + +// The response for [PartitionQuery][google.spanner.v1.Spanner.PartitionQuery] +// or [PartitionRead][google.spanner.v1.Spanner.PartitionRead] +message PartitionResponse { + // Partitions created by this request. + repeated Partition partitions = 1; + + // Transaction created by this request. + Transaction transaction = 2; } // The request for [Read][google.spanner.v1.Spanner.Read] and @@ -338,15 +487,18 @@ message ReadRequest { // is present. If [index][google.spanner.v1.ReadRequest.index] is present, then [key_set][google.spanner.v1.ReadRequest.key_set] instead names // index keys in [index][google.spanner.v1.ReadRequest.index]. // - // Rows are yielded in table primary key order (if [index][google.spanner.v1.ReadRequest.index] is empty) - // or index key order (if [index][google.spanner.v1.ReadRequest.index] is non-empty). + // If the [partition_token][google.spanner.v1.ReadRequest.partition_token] field is empty, rows are yielded + // in table primary key order (if [index][google.spanner.v1.ReadRequest.index] is empty) or index key order + // (if [index][google.spanner.v1.ReadRequest.index] is non-empty). If the [partition_token][google.spanner.v1.ReadRequest.partition_token] field is not + // empty, rows will be yielded in an unspecified order. // // It is not an error for the `key_set` to name rows that do not // exist in the database. Read yields nothing for nonexistent rows. KeySet key_set = 6; // If greater than zero, only the first `limit` rows are yielded. If `limit` - // is zero, the default is no limit. + // is zero, the default is no limit. A limit cannot be specified if + // `partition_token` is set. int64 limit = 8; // If this request is resuming a previously interrupted read, @@ -356,6 +508,12 @@ message ReadRequest { // rest of the request parameters must exactly match the request // that yielded this token. bytes resume_token = 9; + + // If present, results will be restricted to the specified partition + // previously created using PartitionRead(). There must be an exact + // match for the values of fields common to this message and the + // PartitionReadRequest message used to create this partition_token. + bytes partition_token = 10; } // The request for [BeginTransaction][google.spanner.v1.Spanner.BeginTransaction]. diff --git a/samples/batch.js b/samples/batch.js new file mode 100644 index 000000000..a2775675c --- /dev/null +++ b/samples/batch.js @@ -0,0 +1,246 @@ +/** + * Copyright 2018, Google, Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +function createBatchTransaction(instanceId, databaseId, projectId) { + // [START create_batch_transaction] + // Imports the Google Cloud client library + const Spanner = require('@google-cloud/spanner'); + + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // const projectId = 'my-project-id'; + // const instanceId = 'my-instance'; + // const databaseId = 'my-database'; + + // Creates a client + const spanner = new Spanner({ + projectId: projectId, + }); + + // Gets a reference to a Cloud Spanner instance and database + const instance = spanner.instance(instanceId); + const database = instance.database(databaseId); + + database + .createBatchTransaction() + .then(data => { + const transaction = data[0]; + + console.log(`Created batch transaction for ${databaseId}`); + + // Close the transaction when finished. + return transaction.close(); + }) + .then( + () => { + console.log(`Closed batch transaction`); + }, + err => { + console.error('ERROR:', err); + } + ); + // [END create_batch_transaction] +} + +function createQueryPartitions(instanceId, databaseId, identifier, projectId) { + // [START create_query_partitions] + // Imports the Google Cloud client library + const Spanner = require('@google-cloud/spanner'); + + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // const projectId = 'my-project-id'; + // const instanceId = 'my-instance'; + // const databaseId = 'my-database'; + // const identifier = {}; + + // Creates a client + const spanner = new Spanner({ + projectId: projectId, + }); + + // Gets a reference to a Cloud Spanner instance and database + const instance = spanner.instance(instanceId); + const database = instance.database(databaseId); + const transaction = database.batchTransaction(identifier); + + const query = 'SELECT * FROM Singers'; + + transaction + .createQueryPartitions(query) + .then(data => { + const partitions = data[0]; + console.log(`Successfully created query partitions.`); + }) + .catch(err => { + console.error('ERROR:', err); + }); + // [END create_query_partitions] +} + +function createReadPartitions(instanceId, databaseId, identifier, projectId) { + // [START create_read_partitions] + // Imports the Google Cloud client library + const Spanner = require('@google-cloud/spanner'); + + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // const projectId = 'my-project-id'; + // const instanceId = 'my-instance'; + // const databaseId = 'my-database'; + // const identifier = {}; + + // Creates a client + const spanner = new Spanner({ + projectId: projectId, + }); + + // Gets a reference to a Cloud Spanner instance and database + const instance = spanner.instance(instanceId); + const database = instance.database(databaseId); + const transaction = database.batchTransaction(identifier); + + const options = { + table: 'Singers', + keys: ['1'], + columns: ['SingerId'], + }; + + transaction + .createReadPartitions(options) + .then(data => { + const partitions = data[0]; + console.log(`Successfully created read partitions.`); + }) + .catch(err => { + console.error('ERROR:', err); + }); + // [END create_read_partitions] +} + +function executePartition( + instanceId, + databaseId, + identifier, + partition, + projectId +) { + // [START execute_partition] + // Imports the Google Cloud client library + const Spanner = require('@google-cloud/spanner'); + + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // const projectId = 'my-project-id'; + // const instanceId = 'my-instance'; + // const databaseId = 'my-database'; + // const identifier = {}; + // const partition = {}; + + // Creates a client + const spanner = new Spanner({ + projectId: projectId, + }); + + // Gets a reference to a Cloud Spanner instance and database + const instance = spanner.instance(instanceId); + const database = instance.database(databaseId); + const transaction = database.batchTransaction(identifier); + + transaction + .execute(partition) + .then(data => { + const rows = data[0]; + console.log(`Successfully executed partition.`); + }) + .catch(err => { + console.error('ERROR:', err); + }); + // [END execute_partition] +} + +require(`yargs`) + .demand(1) + .command( + `create-batch-transaction `, + 'Creates a batch transaction for an example Cloud Spanner database.', + {}, + opts => + createBatchTransaction( + opts.instanceName, + opts.databaseName, + opts.projectId + ) + ) + .command( + `create-query-partitions `, + 'Creates query partitions.', + {}, + opts => + createQueryPartitions( + opts.instanceName, + opts.databaseName, + JSON.parse(opts.identifier), + opts.projectId + ) + ) + .command( + `create-read-partitions `, + 'Creates read partitions.', + {}, + opts => + createReadPartitions( + opts.instanceName, + opts.databaseName, + JSON.parse(opts.identifier), + opts.projectId + ) + ) + .command( + `execute-partition `, + 'Executes a partition.', + {}, + opts => + executePartition( + opts.instanceName, + opts.databaseName, + JSON.parse(opts.identifier), + JSON.parse(opts.partition), + opts.projectId + ) + ) + .example( + `node $0 create-batch-transaction "my-instance" "my-database" "{}" "my-project-id"` + ) + .example( + `node $0 create-query-partitions "my-instance" "my-database" "{}" "my-project-id"` + ) + .example( + `node $0 create-read-partitions "my-instance" "my-database" "{}" "my-project-id"` + ) + .example( + `node $0 execute-partition "my-instance" "my-database" "{}" "{}" "my-project-id"` + ) + .wrap(120) + .recommendCommands() + .epilogue(`For more information, see https://cloud.google.com/spanner/docs`) + .strict() + .help().argv; diff --git a/samples/system-test/spanner.test.js b/samples/system-test/spanner.test.js index 86bd7346e..cb2e0b508 100644 --- a/samples/system-test/spanner.test.js +++ b/samples/system-test/spanner.test.js @@ -20,6 +20,7 @@ const Spanner = require(`@google-cloud/spanner`); const test = require(`ava`); const tools = require(`@google-cloud/nodejs-repo-tools`); +const batchCmd = `node batch.js`; const crudCmd = `node crud.js`; const schemaCmd = `node schema.js`; const indexingCmd = `node indexing.js`; @@ -294,3 +295,77 @@ test.serial( t.regex(output, /SingerId: 2, AlbumId: 2, MarketingBudget: 300000/); } ); + +// create_batch_transaction +test.serial(`should create a batch transaction`, async t => { + let results = await tools.runAsyncWithIO( + `${batchCmd} create-batch-transaction ${INSTANCE_ID} ${DATABASE_ID} ${PROJECT_ID}`, + cwd + ); + + let output = results.stdout + results.stderr; + + t.regex(output, new RegExp(`Created batch transaction for ${DATABASE_ID}`)); + t.regex(output, new RegExp(`Closed batch transaction`)); +}); + +// create_query_partitions +test.serial(`should create query partitions`, async t => { + const instance = spanner.instance(INSTANCE_ID); + const database = instance.database(DATABASE_ID); + const [transaction] = await database.createBatchTransaction(); + const identifier = JSON.stringify(transaction.identifier()); + + let results = await tools.runAsyncWithIO( + `${batchCmd} create-query-partitions ${INSTANCE_ID} ${DATABASE_ID} '${identifier}' ${PROJECT_ID}`, + cwd + ); + + let output = results.stdout + results.stderr; + + t.regex(output, /Successfully created query partitions\./); + + await transaction.close(); +}); + +// create_read_partitions +test.serial(`should create read partitions`, async t => { + const instance = spanner.instance(INSTANCE_ID); + const database = instance.database(DATABASE_ID); + const [transaction] = await database.createBatchTransaction(); + const identifier = JSON.stringify(transaction.identifier()); + + let results = await tools.runAsyncWithIO( + `${batchCmd} create-read-partitions ${INSTANCE_ID} ${DATABASE_ID} '${identifier}' ${PROJECT_ID}`, + cwd + ); + + let output = results.stdout + results.stderr; + + t.regex(output, /Successfully created read partitions\./); + + await transaction.close(); +}); + +// execute_partition +test.serial(`should execute a partition`, async t => { + const instance = spanner.instance(INSTANCE_ID); + const database = instance.database(DATABASE_ID); + const [transaction] = await database.createBatchTransaction(); + const identifier = JSON.stringify(transaction.identifier()); + + const query = `SELECT SingerId FROM Albums`; + const [partitions] = await transaction.createQueryPartitions(query); + const partition = JSON.stringify(partitions[0]); + + let results = await tools.runAsyncWithIO( + `${batchCmd} execute-partition ${INSTANCE_ID} ${DATABASE_ID} '${identifier}' '${partition}' ${PROJECT_ID}`, + cwd + ); + + let output = results.stdout + results.stderr; + + t.regex(output, /Successfully executed partition\./); + + await transaction.close(); +}); diff --git a/src/batch-transaction.js b/src/batch-transaction.js new file mode 100644 index 000000000..5f36d04ac --- /dev/null +++ b/src/batch-transaction.js @@ -0,0 +1,343 @@ +/*! + * Copyright 2018 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +var common = require('@google-cloud/common'); +var extend = require('extend'); +var is = require('is'); +var util = require('util'); + +var codec = require('./codec.js'); +var Transaction = require('./transaction.js'); + +/** + * Use a BatchTransaction object to create partitions and read/query against + * your Cloud Spanner database. + * + * @class + * @extends Transaction + * + * @param {TransactionOptions} [options] [Transaction options](https://cloud.google.com/spanner/docs/timestamp-bounds). + * + * @example include:samples/batch.js + * region_tag:create_batch_transaction + */ +function BatchTransaction(session) { + Transaction.call(this, session, {readOnly: true}); +} + +util.inherits(BatchTransaction, Transaction); + +/** + * Closes all open resources. + * + * When the transaction is no longer needed, you should call this method to free + * up resources allocated by the Batch client. + * + * Calling this method would render the transaction unusable everywhere. In + * particular if this transaction object was being used across multiple + * machines, calling this method on any of the machine would make the + * transaction unusable on all the machines. This should only be called when the + * transaction is no longer needed anywhere + * + * @param {BasicCallback} [callback] Callback function. + * @returns {Promise} + * + * @example + * const Spanner = require('@google-cloud/spanner'); + * const spanner = new Spanner(); + * + * const instance = spanner.instance('my-instance'); + * const database = instance.database('my-database'); + * + * database.createBatchTransaction(function(err, transaction) { + * if (err) { + * // Error handling omitted. + * } + * + * transaction.close(function(err, apiResponse) {}); + * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * database.createBatchTransaction().then(function(data) { + * var transaction = data[0]; + * return transaction.close(); + * }); + */ +BatchTransaction.prototype.close = function(callback) { + this.session.delete(callback); +}; + +/** + * @see [`ExecuteSqlRequest`](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest) + * @typedef {object} QueryPartition + * @property {string} partitionToken The partition token. + */ +/** + * @typedef {array} CreateQueryPartitionsResponse + * @property {QueryPartition[]} 0 List of query partitions. + * @property {object} 1 The full API response. + */ +/** + * @callback CreateQueryPartitionsCallback + * @param {?Error} err Request error, if any. + * @param {QueryPartition[]} partitions List of query partitions. + * @param {object} apiResponse The full API response. + */ +/** + * Creates a set of query partitions that can be used to execute a query + * operation in parallel. Partitions become invalid when the transaction used + * to create them is closed. + * + * @param {string|object} query A SQL query or + * [`ExecuteSqlRequest`](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest) + * object. + * @param {object} [query.params] A map of parameter name to values. + * @param {object} [query.partitionOptions] A map of partition options. + * @param {object} [query.types] A map of parameter types. + * @param {CreateQueryPartitionsCallback} [callback] Callback function. + * @returns {Promise} + * + * @example include:samples/batch.js + * region_tag:create_query_partitions + */ +BatchTransaction.prototype.createQueryPartitions = function(query, callback) { + if (is.string(query)) { + query = { + sql: query, + }; + } + + var reqOpts = codec.encodeQuery(query); + var gaxOpts = query.gaxOptions; + + if (gaxOpts) { + delete reqOpts.gaxOptions; + } + + this.createPartitions_( + { + client: 'SpannerClient', + method: 'partitionQuery', + reqOpts, + gaxOpts, + }, + callback + ); +}; + +/** + * Generic create partition method. Handles common parameters used in both + * {@link BatchTransaction#createQueryPartitions} and {@link BatchTransaction#createReadPartitions} + * + * @private + * + * @param {object} config The request config. + * @param {function} Callback function. + */ +BatchTransaction.prototype.createPartitions_ = function(config, callback) { + var self = this; + + var query = extend({}, config.reqOpts, { + session: this.session.formattedName_, + transaction: {id: this.id}, + }); + + config.reqOpts = extend({}, query); + delete query.partitionOptions; + + this.request(config, function(err, resp) { + if (err) { + callback(err, null, resp); + return; + } + + var partitions = resp.partitions.map(function(partition) { + return extend({}, query, partition); + }); + + if (resp.transaction) { + self.id = resp.transaction.id; + self.readTimestamp = resp.transaction.readTimestamp; + } + + callback(null, partitions, resp); + }); +}; + +/** + * @typedef {object} ReadPartition + * @mixes ReadRequestOptions + * @property {string} partitionToken The partition token. + */ +/** + * @typedef {array} CreateReadPartitionsResponse + * @property {ReadPartition[]} 0 List of read partitions. + * @property {object} 1 The full API response. + */ +/** + * @callback CreateReadPartitionsCallback + * @param {?Error} err Request error, if any. + * @param {ReadPartition[]} partitions List of read partitions. + * @param {object} apiResponse The full API response. + */ +/** + * Creates a set of read partitions that can be used to execute a read + * operation in parallel. Partitions become invalid when the transaction used + * to create them is closed. + * + * @param {ReadRequestOptions} options Configuration object, describing what to + * read from. + * @param {CreateReadPartitionsCallback} [callback] Callback function. + * @returns {Promise} + * + * @example include:samples/batch.js + * region_tag:create_read_partitions + */ +BatchTransaction.prototype.createReadPartitions = function(options, callback) { + var reqOpts = codec.encodeRead(options); + var gaxOpts = options.gaxOptions; + + if (gaxOpts) { + delete reqOpts.gaxOptions; + } + + this.createPartitions_( + { + client: 'SpannerClient', + method: 'partitionRead', + reqOpts, + gaxOpts, + }, + callback + ); +}; + +/** + * Executes partition. + * + * @see {@link Transaction#read} when using {@link ReadPartition}. + * @see {@link Transaction#run} when using {@link QueryParition}. + * + * @param {ReadPartition|QueryParition} partition The partition object. + * @param {TransactionRequestReadCallback|RunCallback} callback Callback + * function. + * @returns {Promise|Promise} + * + * @example include:samples/batch.js + * region_tag:execute_partition + */ +BatchTransaction.prototype.execute = function(partition, callback) { + if (is.string(partition.table)) { + this.read(partition.table, partition, callback); + return; + } + + this.run(partition, callback); +}; + +/** + * Executes partition in streaming mode. + * + * @see {@link Transaction#createReadStream} when using {@link ReadPartition}. + * @see {@link Transaction#runStream} when using {@link QueryParition}. + * + * @param {ReadPartition|QueryPartition} partition The partition object. + * @returns {ReadableStream} A readable stream that emits rows. + * + * @example + * const Spanner = require('@google-cloud/spanner'); + * const spanner = new Spanner(); + * + * const instance = spanner.instance('my-instance'); + * const database = instance.database('my-database'); + * const transaction = database.batchTransaction(identifier); + * + * transaction.createReadPartitions(options, function(err, partitions) { + * const partition = partitions[0]; + * + * transaction + * .executeStream(partition) + * .on('error', function(err) {}) + * .on('data', function(row) { + * // row = [ + * // { + * // name: 'SingerId', + * // value: '1' + * // }, + * // { + * // name: 'Name', + * // value: 'Eddie Wilson' + * // } + * // ] + * }) + * .on('end', function() { + * // All results retrieved + * }); + * }); + */ +BatchTransaction.prototype.executeStream = function(partition) { + if (is.string(partition.table)) { + return this.createReadStream(partition.table, partition); + } + + return this.runStream(partition); +}; + +/** + * @typedef {object} TransactionIdentifier + * @property {string} session The full session name. + * @property {string} transaction The transaction ID. + * @property {string|Date} readTimestamp The transaction read timestamp. + */ +/** + * Creates a transaction identifier used to reference the transaction in + * workers. + * + * @returns {TransactionIdentifier} + * + * @example + * const Spanner = require('@google-cloud/spanner'); + * const spanner = new Spanner(); + * + * const instance = spanner.instance('my-instance'); + * const database = instance.database('my-database'); + * + * database.createBatchTransaction(function(err, transaction) { + * const identifier = transaction.identifier(); + * }); + */ +BatchTransaction.prototype.identifier = function() { + return { + transaction: this.id.toString('base64'), + session: this.session.id, + timestamp: this.readTimestamp, + }; +}; + +/*! Developer Documentation + * + * All async methods (except for streams) will return a Promise in the event + * that a callback is omitted. + */ +common.util.promisifyAll(BatchTransaction, { + exclude: ['identifier'], +}); + +module.exports = BatchTransaction; diff --git a/src/codec.js b/src/codec.js index 91606ff61..409470fa0 100644 --- a/src/codec.js +++ b/src/codec.js @@ -18,6 +18,7 @@ var codec = module.exports; +var arrify = require('arrify'); var Buffer = require('safe-buffer').Buffer; var commonGrpc = require('@google-cloud/common-grpc'); var extend = require('extend'); @@ -383,3 +384,53 @@ function encodeQuery(query) { } codec.encodeQuery = encodeQuery; + +/** + * Encodes a ReadRequest into the correct format. + * + * @private + * + * @param {object|string|string[]} query The query + * @returns {object} + */ +function encodeRead(query) { + if (is.array(query) || is.string(query)) { + query = { + keys: query, + }; + } + + var encoded = extend({}, query); + + if (query.keys || query.ranges) { + encoded.keySet = {}; + } + + if (query.keys) { + encoded.keySet.keys = arrify(query.keys).map(function(key) { + return { + values: arrify(key).map(codec.encode), + }; + }); + delete encoded.keys; + } + + if (query.ranges) { + encoded.keySet.ranges = arrify(query.ranges).map(function(rawRange) { + var range = extend({}, rawRange); + + for (var bound in range) { + range[bound] = { + values: arrify(range[bound]).map(codec.encode), + }; + } + + return range; + }); + delete encoded.ranges; + } + + return encoded; +} + +codec.encodeRead = encodeRead; diff --git a/src/database.js b/src/database.js index 15c9ff92b..df5ca133e 100644 --- a/src/database.js +++ b/src/database.js @@ -24,6 +24,7 @@ var extend = require('extend'); var is = require('is'); var modelo = require('modelo'); +var BatchTransaction = require('./batch-transaction.js'); var codec = require('./codec.js'); var PartialResultStream = require('./partial-result-stream.js'); var Session = require('./session.js'); @@ -221,6 +222,44 @@ Database.formatName_ = function(instanceName, name) { return instanceName + '/databases/' + databaseName; }; +/** + * Get a reference to a {@link BatchTransaction} object. + * + * @see {@link BatchTransaction#identifier} to generate an identifier. + * + * @param {TransactionIdentifier} identifier The transaction identifier. + * @param {TransactionOptions} [options] [Transaction options](https://cloud.google.com/spanner/docs/timestamp-bounds). + * @returns {BatchTransaction} A batch transaction object. + * + * @example + * const Spanner = require('@google-cloud/spanner'); + * const spanner = new Spanner(); + * + * const instance = spanner.instance('my-instance'); + * const database = instance.database('my-database'); + * + * const transaction = database.batchTransaction({ + * session: 'my-session', + * transaction: 'my-transaction', + * readTimestamp: 1518464696657 + * }); + */ +Database.prototype.batchTransaction = function(identifier) { + var session = identifier.session; + var id = identifier.transaction; + + if (is.string(session)) { + session = this.session_(session); + } + + var transaction = new BatchTransaction(session); + + transaction.id = id; + transaction.readTimestamp = identifier.readTimestamp; + + return transaction; +}; + /** * @callback CloseDatabaseCallback * @param {?Error} err Request error, if any. @@ -270,6 +309,56 @@ Database.prototype.close = function(callback) { this.pool_.close().then(() => callback(leakError), callback); }; +/** + * @typedef {array} CreateTransactionResponse + * @property {BatchTransaction} 0 The {@link BatchTransaction}. + * @property {object} 1 The full API response. + */ +/** + * @callback CreateTransactionCallback + * @param {?Error} err Request error, if any. + * @param {BatchTransaction} transaction The {@link BatchTransaction}. + * @param {object} apiResponse The full API response. + */ +/** + * Create a transaction that can be used for batch querying. + * + * @param {TransactionOptions} [options] [Transaction options](https://cloud.google.com/spanner/docs/timestamp-bounds). + * @param {CreateTransactionCallback} [callback] Callback function. + * @returns {Promise} + * + * @example include:samples/batch.js + * region_tag:create_batch_transaction + */ +Database.prototype.createBatchTransaction = function(options, callback) { + var self = this; + + if (is.fn(options)) { + callback = options; + options = null; + } + + this.createSession(function(err, session, resp) { + if (err) { + callback(err, null, resp); + return; + } + + var transaction = self.batchTransaction({session}); + + transaction.options = extend({}, options); + + transaction.begin(function(err, resp) { + if (err) { + callback(err, null, resp); + return; + } + + callback(null, transaction, resp); + }); + }); +}; + /** * @typedef {array} CreateTableResponse * @property {Table} 0 The new {@link Table}. @@ -524,6 +613,121 @@ Database.prototype.getSchema = function(callback) { ); }; +/** + * Options object for listing sessions. + * + * @typedef {object} GetSessionsRequest + * @property {boolean} [autoPaginate=true] Have pagination handled + * automatically. + * @property {string} [filter] An expression for filtering the results of the + * request. Filter rules are case insensitive. The fields eligible for + * filtering are: + * - **`name`** + * - **`display_name`** + * - **`labels.key`** where key is the name of a label + * + * Some examples of using filters are: + * - **`name:*`** The instance has a name. + * - **`name:Howl`** The instance's name is howl. + * - **`labels.env:*`** The instance has the label env. + * - **`labels.env:dev`** The instance's label env has the value dev. + * - **`name:howl labels.env:dev`** The instance's name is howl and it has + * the label env with value dev. + * @property {number} [maxApiCalls] Maximum number of API calls to make. + * @property {number} [maxResults] Maximum number of items to return. + * @property {number} [pageSize] Maximum number of results per page. + * @property {string} [pageToken] A previously-returned page token + * representing part of the larger set of results to view. + */ +/** + * @typedef {array} GetSessionsResponse + * @property {Session[]} 0 Array of {@link Session} instances. + * @property {object} 1 The full API response. + */ +/** + * @callback GetSessionsCallback + * @param {?Error} err Request error, if any. + * @param {Session[]} instances Array of {@link Session} instances. + * @param {object} apiResponse The full API response. + */ +/** + * Geta a list of sessions. + * + * Wrapper around {@link v1.SpannerClient#listSessions} + * + * @see {@link v1.SpannerClient#listSessions} + * @see [ListSessions API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.Spanner.ListSessions) + * + * @param {GetSessionsRequest} [options] Options object for listing sessions. + * @param {GetSessionsCallback} [callback] Callback function. + * @returns {Promise} + * + * @example + * const Spanner = require('@google-cloud/spanner'); + * const spanner = new Spanner(); + * + * const instance = spanner.instance('my-instance'); + * const database = instance.database('my-database'); + * + * database.getSessions(function(err, sessions) { + * // `sessions` is an array of `Session` objects. + * }); + * + * //- + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. + * //- + * function callback(err, instances, nextQuery, apiResponse) { + * if (nextQuery) { + * // More results exist. + * database.getSessions(nextQuery, callback); + * } + * } + * + * database.getInstances({ + * autoPaginate: false + * }, callback); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * database.getInstances().then(function(data) { + * const sessions = data[0]; + * }); + */ +Database.prototype.getSessions = function(options, callback) { + var self = this; + + if (is.fn(options)) { + callback = options; + options = {}; + } + + var gaxOpts = options.gaxOptions; + var reqOpts = extend({}, options, {database: this.formattedName_}); + delete reqOpts.gaxOptions; + + this.request( + { + client: 'SpannerClient', + method: 'listSessions', + reqOpts, + gaxOpts, + }, + function(err, sessions) { + if (sessions) { + arguments[1] = sessions.map(function(metadata) { + var session = self.session_(metadata.name); + session.metadata = metadata; + return session; + }); + } + + callback.apply(null, arguments); + } + ); +}; + /** * @typedef {array} GetTransactionResponse * @property {Transaction} 0 The transaction object. @@ -1294,6 +1498,7 @@ Database.prototype.session_ = function(name) { */ common.util.promisifyAll(Database, { exclude: [ + 'batchTransaction', 'delete', 'getMetadata', 'runTransaction', diff --git a/src/index.js b/src/index.js index dd60a767c..03d0c414e 100644 --- a/src/index.js +++ b/src/index.js @@ -196,7 +196,7 @@ function Spanner(options) { this.auth = googleAuth(this.options); var config = { - baseUrl: 'spanner.googleapis.com', + baseUrl: this.options.servicePath || gapic.v1.SpannerClient.servicePath, protosDir: path.resolve(__dirname, '../protos'), protoServices: { Operations: { diff --git a/src/transaction-request.js b/src/transaction-request.js index 228b0aa21..0bca167e9 100644 --- a/src/transaction-request.js +++ b/src/transaction-request.js @@ -257,18 +257,9 @@ TransactionRequest.fromProtoTimestamp_ = function(value) { TransactionRequest.prototype.createReadStream = function(table, query) { var self = this; - if (is.array(query) || is.string(query)) { - query = { - keys: query, - }; - } + var reqOpts = codec.encodeRead(query); - var reqOpts = extend( - { - table: table, - }, - query - ); + reqOpts.table = table; delete reqOpts.json; delete reqOpts.jsonOptions; @@ -279,34 +270,6 @@ TransactionRequest.prototype.createReadStream = function(table, query) { }; } - if (query.keys || query.ranges) { - reqOpts.keySet = {}; - } - - if (query.keys) { - reqOpts.keySet.keys = arrify(query.keys).map(function(key) { - return { - values: arrify(key).map(codec.encode), - }; - }); - delete reqOpts.keys; - } - - if (query.ranges) { - reqOpts.keySet.ranges = arrify(query.ranges).map(function(rawRange) { - var range = extend({}, rawRange); - - for (var bound in range) { - range[bound] = { - values: arrify(range[bound]).map(codec.encode), - }; - } - - return range; - }); - delete reqOpts.ranges; - } - var gaxOptions = query.gaxOptions; if (gaxOptions) { diff --git a/src/v1/spanner_client.js b/src/v1/spanner_client.js index a9cf70c28..538044411 100644 --- a/src/v1/spanner_client.js +++ b/src/v1/spanner_client.js @@ -1,10 +1,10 @@ -// Copyright 2017, Google Inc. All rights reserved. +// Copyright 2018 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -165,6 +165,8 @@ class SpannerClient { 'beginTransaction', 'commit', 'rollback', + 'partitionQuery', + 'partitionRead', ]; for (let methodName of spannerStubMethods) { this._innerApiCalls[methodName] = gax.createApiCall( @@ -203,6 +205,7 @@ class SpannerClient { static get scopes() { return [ 'https://www.googleapis.com/auth/cloud-platform', + 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data', ]; } @@ -607,6 +610,11 @@ class SpannerClient { * ResultSetStats. * * The number should be among the values of [QueryMode]{@link google.spanner.v1.QueryMode} + * @param {string} [request.partitionToken] + * If present, results will be restricted to the specified partition + * previously created using PartitionQuery(). There must be an exact + * match for the values of fields common to this message and the + * PartitionQueryRequest message used to create this partition_token. * @param {Object} [options] * Optional parameters. You can override the default settings for this call, e.g, timeout, * retries, paginations, etc. See [gax.CallOptions]{@link https://googleapis.github.io/gax-nodejs/global.html#CallOptions} for the details. @@ -707,6 +715,11 @@ class SpannerClient { * ResultSetStats. * * The number should be among the values of [QueryMode]{@link google.spanner.v1.QueryMode} + * @param {string} [request.partitionToken] + * If present, results will be restricted to the specified partition + * previously created using PartitionQuery(). There must be an exact + * match for the values of fields common to this message and the + * PartitionQueryRequest message used to create this partition_token. * @param {Object} [options] * Optional parameters. You can override the default settings for this call, e.g, timeout, * retries, paginations, etc. See [gax.CallOptions]{@link https://googleapis.github.io/gax-nodejs/global.html#CallOptions} for the details. @@ -767,8 +780,10 @@ class SpannerClient { * is present. If index is present, then key_set instead names * index keys in index. * - * Rows are yielded in table primary key order (if index is empty) - * or index key order (if index is non-empty). + * If the partition_token field is empty, rows are yielded + * in table primary key order (if index is empty) or index key order + * (if index is non-empty). If the partition_token field is not + * empty, rows will be yielded in an unspecified order. * * It is not an error for the `key_set` to name rows that do not * exist in the database. Read yields nothing for nonexistent rows. @@ -785,7 +800,8 @@ class SpannerClient { * and sorting result rows. See key_set for further information. * @param {number} [request.limit] * If greater than zero, only the first `limit` rows are yielded. If `limit` - * is zero, the default is no limit. + * is zero, the default is no limit. A limit cannot be specified if + * `partition_token` is set. * @param {string} [request.resumeToken] * If this request is resuming a previously interrupted read, * `resume_token` should be copied from the last @@ -793,6 +809,11 @@ class SpannerClient { * enables the new read to resume where the last read left off. The * rest of the request parameters must exactly match the request * that yielded this token. + * @param {string} [request.partitionToken] + * If present, results will be restricted to the specified partition + * previously created using PartitionRead(). There must be an exact + * match for the values of fields common to this message and the + * PartitionReadRequest message used to create this partition_token. * @param {Object} [options] * Optional parameters. You can override the default settings for this call, e.g, timeout, * retries, paginations, etc. See [gax.CallOptions]{@link https://googleapis.github.io/gax-nodejs/global.html#CallOptions} for the details. @@ -863,8 +884,10 @@ class SpannerClient { * is present. If index is present, then key_set instead names * index keys in index. * - * Rows are yielded in table primary key order (if index is empty) - * or index key order (if index is non-empty). + * If the partition_token field is empty, rows are yielded + * in table primary key order (if index is empty) or index key order + * (if index is non-empty). If the partition_token field is not + * empty, rows will be yielded in an unspecified order. * * It is not an error for the `key_set` to name rows that do not * exist in the database. Read yields nothing for nonexistent rows. @@ -881,7 +904,8 @@ class SpannerClient { * and sorting result rows. See key_set for further information. * @param {number} [request.limit] * If greater than zero, only the first `limit` rows are yielded. If `limit` - * is zero, the default is no limit. + * is zero, the default is no limit. A limit cannot be specified if + * `partition_token` is set. * @param {string} [request.resumeToken] * If this request is resuming a previously interrupted read, * `resume_token` should be copied from the last @@ -889,6 +913,11 @@ class SpannerClient { * enables the new read to resume where the last read left off. The * rest of the request parameters must exactly match the request * that yielded this token. + * @param {string} [request.partitionToken] + * If present, results will be restricted to the specified partition + * previously created using PartitionRead(). There must be an exact + * match for the values of fields common to this message and the + * PartitionReadRequest message used to create this partition_token. * @param {Object} [options] * Optional parameters. You can override the default settings for this call, e.g, timeout, * retries, paginations, etc. See [gax.CallOptions]{@link https://googleapis.github.io/gax-nodejs/global.html#CallOptions} for the details. @@ -1111,6 +1140,194 @@ class SpannerClient { return this._innerApiCalls.rollback(request, options, callback); } + /** + * Creates a set of partition tokens that can be used to execute a query + * operation in parallel. Each of the returned partition tokens can be used + * by ExecuteStreamingSql to specify a subset + * of the query result to read. The same session and read-only transaction + * must be used by the PartitionQueryRequest used to create the + * partition tokens and the ExecuteSqlRequests that use the partition tokens. + * Partition tokens become invalid when the session used to create them + * is deleted or begins a new transaction. + * + * @param {Object} request + * The request object that will be sent. + * @param {string} request.session + * Required. The session used to create the partitions. + * @param {string} request.sql + * The query request to generate partitions for. The request will fail if + * the query is not root partitionable. The query plan of a root + * partitionable query has a single distributed union operator. A distributed + * union operator conceptually divides one or more tables into multiple + * splits, remotely evaluates a subquery independently on each split, and + * then unions all results. + * @param {Object} [request.transaction] + * Read only snapshot transactions are supported, read/write and single use + * transactions are not. + * + * This object should have the same structure as [TransactionSelector]{@link google.spanner.v1.TransactionSelector} + * @param {Object} [request.params] + * The SQL query string can contain parameter placeholders. A parameter + * placeholder consists of `'@'` followed by the parameter + * name. Parameter names consist of any combination of letters, + * numbers, and underscores. + * + * Parameters can appear anywhere that a literal value is expected. The same + * parameter name can be used more than once, for example: + * `"WHERE id > @msg_id AND id < @msg_id + 100"` + * + * It is an error to execute an SQL query with unbound parameters. + * + * Parameter values are specified using `params`, which is a JSON + * object whose keys are parameter names, and whose values are the + * corresponding parameter values. + * + * This object should have the same structure as [Struct]{@link google.protobuf.Struct} + * @param {Object.} [request.paramTypes] + * It is not always possible for Cloud Spanner to infer the right SQL type + * from a JSON value. For example, values of type `BYTES` and values + * of type `STRING` both appear in params as JSON strings. + * + * In these cases, `param_types` can be used to specify the exact + * SQL type for some or all of the SQL query parameters. See the + * definition of Type for more information + * about SQL types. + * @param {Object} [request.partitionOptions] + * Additional options that affect how many partitions are created. + * + * This object should have the same structure as [PartitionOptions]{@link google.spanner.v1.PartitionOptions} + * @param {Object} [options] + * Optional parameters. You can override the default settings for this call, e.g, timeout, + * retries, paginations, etc. See [gax.CallOptions]{@link https://googleapis.github.io/gax-nodejs/global.html#CallOptions} for the details. + * @param {function(?Error, ?Object)} [callback] + * The function which will be called with the result of the API call. + * + * The second parameter to the callback is an object representing [PartitionResponse]{@link google.spanner.v1.PartitionResponse}. + * @returns {Promise} - The promise which resolves to an array. + * The first element of the array is an object representing [PartitionResponse]{@link google.spanner.v1.PartitionResponse}. + * The promise has a method named "cancel" which cancels the ongoing API call. + * + * @example + * + * const spanner = require('@google-cloud/spanner'); + * + * var client = new spanner.v1.SpannerClient({ + * // optional auth parameters. + * }); + * + * var formattedSession = client.sessionPath('[PROJECT]', '[INSTANCE]', '[DATABASE]', '[SESSION]'); + * var sql = ''; + * var request = { + * session: formattedSession, + * sql: sql, + * }; + * client.partitionQuery(request) + * .then(responses => { + * var response = responses[0]; + * // doThingsWith(response) + * }) + * .catch(err => { + * console.error(err); + * }); + */ + partitionQuery(request, options, callback) { + if (options instanceof Function && callback === undefined) { + callback = options; + options = {}; + } + options = options || {}; + + return this._innerApiCalls.partitionQuery(request, options, callback); + } + + /** + * Creates a set of partition tokens that can be used to execute a read + * operation in parallel. Each of the returned partition tokens can be used + * by StreamingRead to specify a subset of the read + * result to read. The same session and read-only transaction must be used by + * the PartitionReadRequest used to create the partition tokens and the + * ReadRequests that use the partition tokens. + * Partition tokens become invalid when the session used to create them + * is deleted or begins a new transaction. + * + * @param {Object} request + * The request object that will be sent. + * @param {string} request.session + * Required. The session used to create the partitions. + * @param {string} request.table + * Required. The name of the table in the database to be read. + * @param {Object} request.keySet + * Required. `key_set` identifies the rows to be yielded. `key_set` names the + * primary keys of the rows in table to be yielded, unless index + * is present. If index is present, then key_set instead names + * index keys in index. + * + * It is not an error for the `key_set` to name rows that do not + * exist in the database. Read yields nothing for nonexistent rows. + * + * This object should have the same structure as [KeySet]{@link google.spanner.v1.KeySet} + * @param {Object} [request.transaction] + * Read only snapshot transactions are supported, read/write and single use + * transactions are not. + * + * This object should have the same structure as [TransactionSelector]{@link google.spanner.v1.TransactionSelector} + * @param {string} [request.index] + * If non-empty, the name of an index on table. This index is + * used instead of the table primary key when interpreting key_set + * and sorting result rows. See key_set for further information. + * @param {string[]} [request.columns] + * The columns of table to be returned for each row matching + * this request. + * @param {Object} [request.partitionOptions] + * Additional options that affect how many partitions are created. + * + * This object should have the same structure as [PartitionOptions]{@link google.spanner.v1.PartitionOptions} + * @param {Object} [options] + * Optional parameters. You can override the default settings for this call, e.g, timeout, + * retries, paginations, etc. See [gax.CallOptions]{@link https://googleapis.github.io/gax-nodejs/global.html#CallOptions} for the details. + * @param {function(?Error, ?Object)} [callback] + * The function which will be called with the result of the API call. + * + * The second parameter to the callback is an object representing [PartitionResponse]{@link google.spanner.v1.PartitionResponse}. + * @returns {Promise} - The promise which resolves to an array. + * The first element of the array is an object representing [PartitionResponse]{@link google.spanner.v1.PartitionResponse}. + * The promise has a method named "cancel" which cancels the ongoing API call. + * + * @example + * + * const spanner = require('@google-cloud/spanner'); + * + * var client = new spanner.v1.SpannerClient({ + * // optional auth parameters. + * }); + * + * var formattedSession = client.sessionPath('[PROJECT]', '[INSTANCE]', '[DATABASE]', '[SESSION]'); + * var table = ''; + * var keySet = {}; + * var request = { + * session: formattedSession, + * table: table, + * keySet: keySet, + * }; + * client.partitionRead(request) + * .then(responses => { + * var response = responses[0]; + * // doThingsWith(response) + * }) + * .catch(err => { + * console.error(err); + * }); + */ + partitionRead(request, options, callback) { + if (options instanceof Function && callback === undefined) { + callback = options; + options = {}; + } + options = options || {}; + + return this._innerApiCalls.partitionRead(request, options, callback); + } + // -------------------- // -- Path templates -- // -------------------- diff --git a/src/v1/spanner_client_config.json b/src/v1/spanner_client_config.json index 9b8171785..3dd3ad7e5 100644 --- a/src/v1/spanner_client_config.json +++ b/src/v1/spanner_client_config.json @@ -86,6 +86,16 @@ "timeout_millis": 30000, "retry_codes_name": "idempotent", "retry_params_name": "default" + }, + "PartitionQuery": { + "timeout_millis": 3600000, + "retry_codes_name": "idempotent", + "retry_params_name": "default" + }, + "PartitionRead": { + "timeout_millis": 30000, + "retry_codes_name": "idempotent", + "retry_params_name": "default" } } } diff --git a/test/batch-transaction.js b/test/batch-transaction.js new file mode 100644 index 000000000..762adf8ff --- /dev/null +++ b/test/batch-transaction.js @@ -0,0 +1,336 @@ +/*! + * Copyright 2018 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +var assert = require('assert'); +var extend = require('extend'); +var proxyquire = require('proxyquire'); +var util = require('@google-cloud/common').util; + +var promisified = false; +var fakeUtil = extend({}, util, { + promisifyAll: function(Class, options) { + if (Class.name !== 'BatchTransaction') { + return; + } + + assert.deepEqual(options.exclude, ['identifier']); + promisified = true; + }, +}); + +var fakeCodec = { + encode: util.noop, + Int: function() {}, + Float: function() {}, + SpannerDate: function() {}, +}; + +function FakeTransaction(session) { + this.calledWith_ = arguments; + this.session = session; +} + +describe('BatchTransaction', function() { + var BatchTransaction; + var batchTransaction; + + var SESSION = {}; + + before(function() { + BatchTransaction = proxyquire('../src/batch-transaction.js', { + '@google-cloud/common': { + util: fakeUtil, + }, + './codec.js': fakeCodec, + './transaction.js': FakeTransaction, + }); + }); + + beforeEach(function() { + batchTransaction = new BatchTransaction(SESSION); + }); + + describe('instantiation', function() { + it('should promisify all the things', function() { + assert(promisified); + }); + + it('should extend the Transaction class', function() { + var batchTransaction = new BatchTransaction(SESSION); + + assert(batchTransaction instanceof FakeTransaction); + assert.strictEqual(batchTransaction.calledWith_[0], SESSION); + assert.deepEqual(batchTransaction.calledWith_[1], {readOnly: true}); + }); + }); + + describe('close', function() { + it('should delete the session', function(done) { + SESSION.delete = function(callback) { + callback(); // the done fn + }; + + batchTransaction.close(done); + }); + }); + + describe('createQueryPartitions', function() { + var GAX_OPTS = {a: 'b'}; + var QUERY = { + sql: 'SELECT * FROM Singers', + gaxOptions: GAX_OPTS, + }; + + it('should make the correct request', function(done) { + fakeCodec.encodeQuery = function(query) { + assert.deepEqual(query, {sql: QUERY.sql}); + return QUERY; + }; + + batchTransaction.createPartitions_ = function(config, callback) { + assert.strictEqual(config.client, 'SpannerClient'); + assert.strictEqual(config.method, 'partitionQuery'); + assert.strictEqual(config.reqOpts, QUERY); + callback(); // the done fn + }; + + batchTransaction.createQueryPartitions(QUERY.sql, done); + }); + + it('should remove gax options from the query', function(done) { + var fakeQuery = { + sql: QUERY.sql, + gaxOptions: GAX_OPTS, + }; + + fakeCodec.encodeQuery = function(query) { + assert.strictEqual(query, fakeQuery); + return extend({a: 'b'}, QUERY); + }; + + batchTransaction.createPartitions_ = function(config, callback) { + assert.deepEqual(config.reqOpts, {sql: QUERY.sql, a: 'b'}); + assert.strictEqual(config.gaxOpts, GAX_OPTS); + callback(); // the done fn + }; + + batchTransaction.createQueryPartitions(fakeQuery, done); + }); + }); + + describe('createPartitions_', function() { + var SESSION = {formattedName_: 'abcdef'}; + var ID = 'ghijkl'; + var TIMESTAMP = {seconds: 0, nanos: 0}; + + var PARTITIONS = [{partitionToken: 'a'}, {partitionToken: 'b'}]; + var RESPONSE = {partitions: PARTITIONS}; + + var QUERY = {a: 'b'}; + var CONFIG = {reqOpts: QUERY}; + + beforeEach(function() { + batchTransaction.session = SESSION; + batchTransaction.id = ID; + + batchTransaction.request = function(config, callback) { + callback(null, RESPONSE); + }; + }); + + it('should insert the session and transaction ids', function(done) { + batchTransaction.request = function(config) { + assert.strictEqual(config.reqOpts.a, 'b'); + assert.strictEqual(config.reqOpts.session, SESSION.formattedName_); + assert.deepEqual(config.reqOpts.transaction, {id: ID}); + done(); + }; + + batchTransaction.createPartitions_(CONFIG, assert.ifError); + }); + + it('should return any request errors', function(done) { + var error = new Error('err'); + var response = {}; + + batchTransaction.request = function(config, callback) { + callback(error, response); + }; + + batchTransaction.createPartitions_(CONFIG, function(err, parts, resp) { + assert.strictEqual(err, error); + assert.strictEqual(parts, null); + assert.strictEqual(resp, response); + done(); + }); + }); + + it('should return the prepared partition configs', function(done) { + var expectedQuery = { + a: 'b', + session: SESSION.formattedName_, + transaction: {id: ID}, + }; + + batchTransaction.createPartitions_(CONFIG, function(err, parts) { + assert.ifError(err); + + parts.forEach(function(partition, i) { + var expectedPartition = extend({}, expectedQuery, PARTITIONS[i]); + assert.deepEqual(partition, expectedPartition); + }); + + done(); + }); + }); + + it('should update the transaction with returned metadata', function(done) { + var response = extend({}, RESPONSE, { + transaction: { + id: ID, + readTimestamp: TIMESTAMP, + }, + }); + + batchTransaction.request = function(config, callback) { + callback(null, response); + }; + + batchTransaction.createPartitions_(CONFIG, function(err, parts, resp) { + assert.strictEqual(resp, response); + assert.strictEqual(batchTransaction.id, ID); + assert.strictEqual(batchTransaction.readTimestamp, TIMESTAMP); + done(); + }); + }); + }); + + describe('createReadPartitions', function() { + var GAX_OPTS = {}; + var QUERY = {table: 'abc', gaxOptions: GAX_OPTS}; + + it('should make the correct request', function(done) { + var query = {}; + + fakeCodec.encodeRead = function(options) { + assert.strictEqual(options, query); + return QUERY; + }; + + batchTransaction.createPartitions_ = function(config, callback) { + assert.strictEqual(config.client, 'SpannerClient'); + assert.strictEqual(config.method, 'partitionRead'); + assert.strictEqual(config.reqOpts, QUERY); + callback(); // the done fn + }; + + batchTransaction.createReadPartitions(query, done); + }); + + it('should remove gax options from the query', function(done) { + var query = {gaxOptions: GAX_OPTS}; + + fakeCodec.encodeRead = function() { + return extend({}, QUERY); + }; + + batchTransaction.createPartitions_ = function(config, callback) { + assert.deepEqual(config.reqOpts, {table: QUERY.table}); + assert.strictEqual(config.gaxOpts, GAX_OPTS); + callback(); // the done fn + }; + + batchTransaction.createReadPartitions(query, done); + }); + }); + + describe('execute', function() { + it('should make read requests for read partitions', function(done) { + var partition = {table: 'abc'}; + + batchTransaction.read = function(table, options, callback) { + assert.strictEqual(table, partition.table); + assert.strictEqual(options, partition); + callback(); // the done fn + }; + + batchTransaction.execute(partition, done); + }); + + it('should make query requests for non-read partitions', function(done) { + var partition = {sql: 'SELECT * FROM Singers'}; + + batchTransaction.run = function(query, callback) { + assert.strictEqual(query, partition); + callback(); // the done fn + }; + + batchTransaction.execute(partition, done); + }); + }); + + describe('executeStream', function() { + var STREAM = {}; + + it('should make read streams for read partitions', function() { + var partition = {table: 'abc'}; + + batchTransaction.createReadStream = function(table, options) { + assert.strictEqual(table, partition.table); + assert.strictEqual(options, partition); + return STREAM; + }; + + var stream = batchTransaction.executeStream(partition); + + assert.strictEqual(stream, STREAM); + }); + + it('should make query streams for query partitions', function() { + var partition = {sql: 'SELECT * FROM Singers'}; + + batchTransaction.runStream = function(query) { + assert.strictEqual(query, partition); + return STREAM; + }; + + var stream = batchTransaction.executeStream(partition); + + assert.strictEqual(stream, STREAM); + }); + }); + + describe('identifier', function() { + var ID = Buffer.from('abc'); + var SESSION = {id: 'def'}; + var TIMESTAMP = {seconds: 0, nanos: 0}; + + beforeEach(function() { + batchTransaction.id = ID; + batchTransaction.session = SESSION; + batchTransaction.readTimestamp = TIMESTAMP; + }); + + it('should create a transaction identifier', function() { + var expectedId = ID.toString('base64'); + var identifier = batchTransaction.identifier(); + + assert.strictEqual(identifier.transaction, expectedId); + assert.strictEqual(identifier.session, SESSION.id); + assert.strictEqual(identifier.timestamp, TIMESTAMP); + }); + }); +}); diff --git a/test/codec.js b/test/codec.js index c755c9ff9..3481f2f3b 100644 --- a/test/codec.js +++ b/test/codec.js @@ -785,4 +785,193 @@ describe('codec', function() { assert.strictEqual(encodedQuery.types, undefined); }); }); + + describe('encodeRead', function() { + describe('query.keys', function() { + it('should encode and map input to keySet', function() { + var query = { + keys: ['key', ['composite', 'key']], + }; + + var encodedValue = {}; + var numEncodeRequests = 0; + + codec.encode = function(key) { + numEncodeRequests++; + + switch (numEncodeRequests) { + case 1: { + assert.strictEqual(key, query.keys[0]); + break; + } + case 2: { + assert.strictEqual(key, query.keys[1][0]); + break; + } + case 3: { + assert.strictEqual(key, query.keys[1][1]); + break; + } + } + + return encodedValue; + }; + + var expectedKeys = [ + { + values: [encodedValue], + }, + { + values: [encodedValue, encodedValue], + }, + ]; + + var encoded = codec.encodeRead(query); + assert.deepStrictEqual(encoded.keySet.keys, expectedKeys); + }); + + it('should accept just a key', function() { + var query = 'key'; + + var encodedValue = {}; + codec.encode = function(key) { + assert.strictEqual(key, query); + return encodedValue; + }; + + var encoded = codec.encodeRead(query); + + assert.strictEqual(encoded.keySet.keys[0].values[0], encodedValue); + }); + + it('should accept just an array of keys', function() { + var query = ['key']; + + var encodedValue = {}; + codec.encode = function(key) { + assert.strictEqual(key, query[0]); + return encodedValue; + }; + + var encoded = codec.encodeRead(query); + + assert.strictEqual(encoded.keySet.keys[0].values[0], encodedValue); + }); + + it('should arrify query.keys', function() { + var query = { + keys: 'key', + }; + + var encodedValue = {}; + codec.encode = function(key) { + assert.strictEqual(key, query.keys); + return encodedValue; + }; + + var encoded = codec.encodeRead(query); + + assert.strictEqual(encoded.keySet.keys[0].values[0], encodedValue); + }); + + it('should remove keys property from request object', function() { + var query = { + keys: ['key'], + }; + + var encoded = codec.encodeRead(query); + + assert.strictEqual(encoded.keys, undefined); + }); + }); + + describe('query.ranges', function() { + it('should encode/map the inputs', function() { + var query = { + ranges: [ + { + startOpen: 'key', + endClosed: ['composite', 'key'], + }, + ], + }; + + var encodedValue = {}; + var numEncodeRequests = 0; + + codec.encode = function(key) { + var keys = ['key', 'composite', 'key']; + + assert.strictEqual(key, keys[numEncodeRequests++]); + return encodedValue; + }; + + var expectedRanges = [ + { + startOpen: { + values: [encodedValue], + }, + endClosed: { + values: [encodedValue, encodedValue], + }, + }, + ]; + + var encoded = codec.encodeRead(query); + + assert.strictEqual(numEncodeRequests, 3); + assert.deepStrictEqual(encoded.keySet.ranges, expectedRanges); + }); + + it('should arrify query.ranges', function() { + var query = { + ranges: [ + { + startOpen: 'start', + endClosed: 'end', + }, + ], + }; + + var encodedValue = {}; + var numEncodeRequests = 0; + + codec.encode = function(key) { + assert.strictEqual(key, ['start', 'end'][numEncodeRequests++]); + return encodedValue; + }; + + var expectedRanges = [ + { + startOpen: { + values: [encodedValue], + }, + endClosed: { + values: [encodedValue], + }, + }, + ]; + + var encoded = codec.encodeRead(query); + + assert.strictEqual(numEncodeRequests, 2); + assert.deepStrictEqual(encoded.keySet.ranges, expectedRanges); + }); + + it('should remove the ranges property from the query', function() { + var query = { + ranges: [ + { + startOpen: 'start', + endClosed: 'end', + }, + ], + }; + + var encoded = codec.encodeRead(query); + + assert.strictEqual(encoded.ranges, undefined); + }); + }); + }); }); diff --git a/test/database.js b/test/database.js index 47c638e0c..ace8cd636 100644 --- a/test/database.js +++ b/test/database.js @@ -33,6 +33,7 @@ var fakeUtil = extend({}, util, { promisified = true; assert.deepEqual(options.exclude, [ + 'batchTransaction', 'delete', 'getMetadata', 'runTransaction', @@ -43,6 +44,10 @@ var fakeUtil = extend({}, util, { }, }); +function FakeBatchTransaction() { + this.calledWith_ = arguments; +} + function FakeGrpcServiceObject() { this.calledWith_ = arguments; } @@ -111,6 +116,7 @@ describe('Database', function() { ServiceObject: FakeGrpcServiceObject, }, modelo: fakeModelo, + './batch-transaction.js': FakeBatchTransaction, './codec.js': fakeCodec, './partial-result-stream.js': FakePartialResultStream, './session-pool.js': FakeSessionPool, @@ -233,6 +239,43 @@ describe('Database', function() { }); }); + describe('batchTransaction', function() { + var SESSION = {id: 'hijklmnop'}; + var ID = 'abcdefg'; + var READ_TIMESTAMP = {seconds: 0, nanos: 0}; + + it('should create a transaction object', function() { + var identifier = { + session: SESSION, + transaction: ID, + readTimestamp: READ_TIMESTAMP, + }; + + var transaction = database.batchTransaction(identifier); + + assert(transaction instanceof FakeBatchTransaction); + assert.deepEqual(transaction.calledWith_[0], SESSION); + assert.strictEqual(transaction.id, ID); + assert.strictEqual(transaction.readTimestamp, READ_TIMESTAMP); + }); + + it('should optionally accept a session id', function() { + var identifier = { + session: SESSION.id, + transaction: ID, + readTimestamp: READ_TIMESTAMP, + }; + + database.session_ = function(id) { + assert.strictEqual(id, SESSION.id); + return SESSION; + }; + + var transaction = database.batchTransaction(identifier); + assert.deepEqual(transaction.calledWith_[0], SESSION); + }); + }); + describe('close', function() { describe('success', function() { beforeEach(function() { @@ -306,6 +349,77 @@ describe('Database', function() { }); }); + describe('createBatchTransaction', function() { + var SESSION = {}; + var RESPONSE = {a: 'b'}; + + beforeEach(function() { + database.createSession = function(callback) { + callback(null, SESSION, RESPONSE); + }; + }); + + it('should return any session creation errors', function(done) { + var error = new Error('err'); + var apiResponse = {c: 'd'}; + + database.createSession = function(callback) { + callback(error, null, apiResponse); + }; + + database.createBatchTransaction(function(err, transaction, resp) { + assert.strictEqual(err, error); + assert.strictEqual(transaction, null); + assert.strictEqual(resp, apiResponse); + done(); + }); + }); + + it('should create a transaction', function(done) { + var opts = {a: 'b'}; + + var fakeTransaction = { + begin: function(callback) { + callback(null, RESPONSE); + }, + }; + + database.batchTransaction = function(identifier) { + assert.deepEqual(identifier, {session: SESSION}); + return fakeTransaction; + }; + + database.createBatchTransaction(opts, function(err, transaction, resp) { + assert.strictEqual(err, null); + assert.strictEqual(transaction, fakeTransaction); + assert.deepEqual(transaction.options, opts); + assert.strictEqual(resp, RESPONSE); + done(); + }); + }); + + it('should return any transaction errors', function(done) { + var error = new Error('err'); + + var fakeTransaction = { + begin: function(callback) { + callback(error, RESPONSE); + }, + }; + + database.batchTransaction = function() { + return fakeTransaction; + }; + + database.createBatchTransaction(function(err, transaction, resp) { + assert.strictEqual(err, error); + assert.strictEqual(transaction, null); + assert.strictEqual(resp, RESPONSE); + done(); + }); + }); + }); + describe('createTable', function() { var TABLE_NAME = 'table-name'; var SCHEMA = 'CREATE TABLE `' + TABLE_NAME + '`'; @@ -1028,6 +1142,77 @@ describe('Database', function() { }); }); + describe('getSessions', function() { + it('should make the correct request', function(done) { + var gaxOpts = {}; + var options = {a: 'a', gaxOptions: gaxOpts}; + + var expectedReqOpts = extend({}, options, { + database: database.formattedName_, + }); + + delete expectedReqOpts.gaxOptions; + + database.request = function(config) { + assert.strictEqual(config.client, 'SpannerClient'); + assert.strictEqual(config.method, 'listSessions'); + assert.deepEqual(config.reqOpts, expectedReqOpts); + assert.strictEqual(config.gaxOpts, gaxOpts); + done(); + }; + + database.getSessions(options, assert.ifError); + }); + + it('should not require a query', function(done) { + database.request = function(config) { + assert.deepEqual(config.reqOpts, { + database: database.formattedName_, + }); + + done(); + }; + + database.getSessions(assert.ifError); + }); + + it('should return all arguments on error', function(done) { + var ARGS = [new Error('err'), null, {}]; + + database.request = function(config, callback) { + callback.apply(null, ARGS); + }; + + database.getSessions(function() { + var args = [].slice.call(arguments); + assert.deepEqual(args, ARGS); + done(); + }); + }); + + it('should create and return Session objects', function(done) { + var SESSIONS = [{name: 'abc'}]; + var SESSION_INSTANCE = {}; + var RESPONSE = {}; + + database.request = function(config, callback) { + callback(null, SESSIONS, RESPONSE); + }; + + database.session_ = function(name) { + assert.strictEqual(name, SESSIONS[0].name); + return SESSION_INSTANCE; + }; + + database.getSessions(function(err, sessions, resp) { + assert.ifError(err); + assert.strictEqual(sessions[0], SESSION_INSTANCE); + assert.strictEqual(resp, RESPONSE); + done(); + }); + }); + }); + describe('session_', function() { var NAME = 'session-name'; diff --git a/test/index.js b/test/index.js index de930cb53..d6fc939f9 100644 --- a/test/index.js +++ b/test/index.js @@ -191,7 +191,7 @@ describe('Spanner', function() { var options = spanner.calledWith_[1]; assert.deepEqual(config, { - baseUrl: 'spanner.googleapis.com', + baseUrl: fakeV1.SpannerClient.servicePath, protosDir: path.resolve(__dirname, '../protos'), protoServices: { Operations: { @@ -212,6 +212,15 @@ describe('Spanner', function() { }) ); }); + + it('should optionally accept a servicePath', function() { + var SERVICE_PATH = 'abc.def.ghi'; + var spanner = new Spanner({servicePath: SERVICE_PATH}); + + var config = spanner.calledWith_[0]; + + assert.strictEqual(config.baseUrl, SERVICE_PATH); + }); }); describe('date', function() { diff --git a/test/transaction-request.js b/test/transaction-request.js index 8c25936cd..af4f93285 100644 --- a/test/transaction-request.js +++ b/test/transaction-request.js @@ -197,7 +197,13 @@ describe('TransactionRequest', function() { describe('createReadStream', function() { var TABLE = 'table-name'; - var QUERY = {}; + var QUERY = {e: 'f'}; + + beforeEach(function() { + fakeCodec.encodeRead = function() { + return QUERY; + }; + }); it('should accept a query object', function(done) { var query = { @@ -205,10 +211,15 @@ describe('TransactionRequest', function() { c: 'd', }; - var expectedReqOpts = extend({}, query, { + var expectedReqOpts = extend({}, QUERY, { table: TABLE, }); + fakeCodec.encodeRead = function(readRequest) { + assert.strictEqual(readRequest, query); + return QUERY; + }; + transactionRequest.requestStream = function(options) { assert.deepEqual(options.reqOpts, expectedReqOpts); done(); @@ -225,12 +236,15 @@ describe('TransactionRequest', function() { transactionRequest.transaction = true; transactionRequest.id = ID; - var expectedReqOpts = { - table: TABLE, - transaction: { - id: ID, + var expectedReqOpts = extend( + { + table: TABLE, + transaction: { + id: ID, + }, }, - }; + QUERY + ); transactionRequest.requestStream = function(options) { assert.deepEqual(options.reqOpts, expectedReqOpts); @@ -243,243 +257,6 @@ describe('TransactionRequest', function() { makeRequestFn(); }); - describe('query.keys', function() { - it('should encode and map input to keySet.keys[].values', function(done) { - var query = { - keys: ['key', ['composite', 'key']], - }; - - var encodedValue = {}; - var numEncodeRequests = 0; - - fakeCodec.encode = function(key) { - numEncodeRequests++; - - switch (numEncodeRequests) { - case 1: { - assert.strictEqual(key, query.keys[0]); - break; - } - case 2: { - assert.strictEqual(key, query.keys[1][0]); - break; - } - case 3: { - assert.strictEqual(key, query.keys[1][1]); - break; - } - } - - return encodedValue; - }; - - transactionRequest.requestStream = function(options) { - var expectedKeys = [ - { - values: [encodedValue], - }, - { - values: [encodedValue, encodedValue], - }, - ]; - - assert.deepStrictEqual(options.reqOpts.keySet.keys, expectedKeys); - done(); - }; - - var stream = transactionRequest.createReadStream(TABLE, query); - var makeRequestFn = stream.calledWith_[0]; - makeRequestFn(); - }); - - it('should accept just a key', function(done) { - var query = 'key'; - - var encodedValue = {}; - fakeCodec.encode = function(key) { - assert.strictEqual(key, query); - return encodedValue; - }; - - transactionRequest.requestStream = function(options) { - assert.strictEqual( - options.reqOpts.keySet.keys[0].values[0], - encodedValue - ); - done(); - }; - - var stream = transactionRequest.createReadStream(TABLE, query); - var makeRequestFn = stream.calledWith_[0]; - makeRequestFn(); - }); - - it('should accept just an array of keys', function(done) { - var query = ['key']; - - var encodedValue = {}; - fakeCodec.encode = function(key) { - assert.strictEqual(key, query[0]); - return encodedValue; - }; - - transactionRequest.requestStream = function(options) { - assert.strictEqual( - options.reqOpts.keySet.keys[0].values[0], - encodedValue - ); - done(); - }; - - var stream = transactionRequest.createReadStream(TABLE, query); - var makeRequestFn = stream.calledWith_[0]; - makeRequestFn(); - }); - - it('should arrify query.keys', function(done) { - var query = { - keys: 'key', - }; - - var encodedValue = {}; - fakeCodec.encode = function(key) { - assert.strictEqual(key, query.keys); - return encodedValue; - }; - - transactionRequest.requestStream = function(options) { - assert.strictEqual( - options.reqOpts.keySet.keys[0].values[0], - encodedValue - ); - done(); - }; - - var stream = transactionRequest.createReadStream(TABLE, query); - var makeRequestFn = stream.calledWith_[0]; - makeRequestFn(); - }); - - it('should remove keys property from request object', function(done) { - var query = { - keys: ['key'], - }; - - transactionRequest.requestStream = function(options) { - assert.strictEqual(options.reqOpts.keys, undefined); - done(); - }; - - var stream = transactionRequest.createReadStream(TABLE, query); - var makeRequestFn = stream.calledWith_[0]; - makeRequestFn(); - }); - }); - - describe('query.ranges', function() { - it('should encode/map the inputs', function(done) { - var query = { - ranges: [ - { - startOpen: 'key', - endClosed: ['composite', 'key'], - }, - ], - }; - - var encodedValue = {}; - var numEncodeRequests = 0; - - fakeCodec.encode = function(key) { - var keys = ['key', 'composite', 'key']; - - assert.strictEqual(key, keys[numEncodeRequests++]); - return encodedValue; - }; - - transactionRequest.requestStream = function(options) { - var expectedRanges = [ - { - startOpen: { - values: [encodedValue], - }, - endClosed: { - values: [encodedValue, encodedValue], - }, - }, - ]; - - assert.strictEqual(numEncodeRequests, 3); - assert.deepStrictEqual(options.reqOpts.keySet.ranges, expectedRanges); - done(); - }; - - var stream = transactionRequest.createReadStream(TABLE, query); - var makeRequestFn = stream.calledWith_[0]; - makeRequestFn(); - }); - - it('should arrify query.ranges', function(done) { - var query = { - ranges: [ - { - startOpen: 'start', - endClosed: 'end', - }, - ], - }; - - var encodedValue = {}; - var numEncodeRequests = 0; - - fakeCodec.encode = function(key) { - assert.strictEqual(key, ['start', 'end'][numEncodeRequests++]); - return encodedValue; - }; - - transactionRequest.requestStream = function(options) { - var expectedRanges = [ - { - startOpen: { - values: [encodedValue], - }, - endClosed: { - values: [encodedValue], - }, - }, - ]; - - assert.strictEqual(numEncodeRequests, 2); - assert.deepStrictEqual(options.reqOpts.keySet.ranges, expectedRanges); - done(); - }; - - var stream = transactionRequest.createReadStream(TABLE, query); - var makeRequestFn = stream.calledWith_[0]; - makeRequestFn(); - }); - - it('should remove the ranges property from the query', function(done) { - var query = { - ranges: [ - { - startOpen: 'start', - endClosed: 'end', - }, - ], - }; - - transactionRequest.requestStream = function(options) { - assert.strictEqual(options.reqOpts.ranges, undefined); - done(); - }; - - var stream = transactionRequest.createReadStream(TABLE, query); - var makeRequestFn = stream.calledWith_[0]; - makeRequestFn(); - }); - }); - describe('PartialResultStream', function() { it('should return PartialResultStream', function() { var stream = transactionRequest.createReadStream(TABLE, QUERY); @@ -491,7 +268,7 @@ describe('TransactionRequest', function() { a: 'b', }; - var expectedQuery = extend({}, query, { + var expectedQuery = extend({}, QUERY, { table: TABLE, resumeToken: undefined, }); From df0116e5f450cc8bf4759e49494b08b92dc83634 Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Mon, 26 Feb 2018 18:40:17 -0500 Subject: [PATCH 2/4] fix linting errors --- samples/batch.js | 10 +++++++--- samples/system-test/spanner.test.js | 6 +++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/samples/batch.js b/samples/batch.js index a2775675c..e8c3c0b7c 100644 --- a/samples/batch.js +++ b/samples/batch.js @@ -86,7 +86,9 @@ function createQueryPartitions(instanceId, databaseId, identifier, projectId) { .createQueryPartitions(query) .then(data => { const partitions = data[0]; - console.log(`Successfully created query partitions.`); + console.log( + `Successfully created ${partitions.length} query partitions.` + ); }) .catch(err => { console.error('ERROR:', err); @@ -127,7 +129,7 @@ function createReadPartitions(instanceId, databaseId, identifier, projectId) { .createReadPartitions(options) .then(data => { const partitions = data[0]; - console.log(`Successfully created read partitions.`); + console.log(`Successfully created ${partitions.length} read partitions.`); }) .catch(err => { console.error('ERROR:', err); @@ -169,7 +171,9 @@ function executePartition( .execute(partition) .then(data => { const rows = data[0]; - console.log(`Successfully executed partition.`); + console.log( + `Successfully received ${rows.length} from executed partition.` + ); }) .catch(err => { console.error('ERROR:', err); diff --git a/samples/system-test/spanner.test.js b/samples/system-test/spanner.test.js index cb2e0b508..3904d9d55 100644 --- a/samples/system-test/spanner.test.js +++ b/samples/system-test/spanner.test.js @@ -323,7 +323,7 @@ test.serial(`should create query partitions`, async t => { let output = results.stdout + results.stderr; - t.regex(output, /Successfully created query partitions\./); + t.regex(output, /Successfully created \d query partitions\./); await transaction.close(); }); @@ -342,7 +342,7 @@ test.serial(`should create read partitions`, async t => { let output = results.stdout + results.stderr; - t.regex(output, /Successfully created read partitions\./); + t.regex(output, /Successfully created \d read partitions\./); await transaction.close(); }); @@ -365,7 +365,7 @@ test.serial(`should execute a partition`, async t => { let output = results.stdout + results.stderr; - t.regex(output, /Successfully executed partition\./); + t.regex(output, /Successfully received \d from executed partition\./); await transaction.close(); }); From 093a80c2cbcd00f78a164a069d87ee664a598ce4 Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Wed, 28 Feb 2018 13:38:34 -0500 Subject: [PATCH 3/4] fix documentation errors per pr feedback --- src/batch-transaction.js | 10 ++++++++-- src/database.js | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/batch-transaction.js b/src/batch-transaction.js index 5f36d04ac..90c3f145a 100644 --- a/src/batch-transaction.js +++ b/src/batch-transaction.js @@ -108,10 +108,12 @@ BatchTransaction.prototype.close = function(callback) { * @param {string|object} query A SQL query or * [`ExecuteSqlRequest`](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest) * object. + * @param {object} [query.gaxOptions] Request configuration options, outlined + * here: https://googleapis.github.io/gax-nodejs/CallSettings.html. * @param {object} [query.params] A map of parameter name to values. * @param {object} [query.partitionOptions] A map of partition options. * @param {object} [query.types] A map of parameter types. - * @param {CreateQueryPartitionsCallback} [callback] Callback function. + * @param {CreateQueryPartitionsCallback} [callback] Callback callback function. * @returns {Promise} * * @example include:samples/batch.js @@ -185,6 +187,8 @@ BatchTransaction.prototype.createPartitions_ = function(config, callback) { * @typedef {object} ReadPartition * @mixes ReadRequestOptions * @property {string} partitionToken The partition token. + * @property {object} [gaxOptions] Request configuration options, outlined + * here: https://googleapis.github.io/gax-nodejs/CallSettings.html. */ /** * @typedef {array} CreateReadPartitionsResponse @@ -236,6 +240,8 @@ BatchTransaction.prototype.createReadPartitions = function(options, callback) { * @see {@link Transaction#run} when using {@link QueryParition}. * * @param {ReadPartition|QueryParition} partition The partition object. + * @param {object} [partition.gaxOptions] Request configuration options, + * outlined here: https://googleapis.github.io/gax-nodejs/CallSettings.html. * @param {TransactionRequestReadCallback|RunCallback} callback Callback * function. * @returns {Promise|Promise} @@ -256,7 +262,7 @@ BatchTransaction.prototype.execute = function(partition, callback) { * Executes partition in streaming mode. * * @see {@link Transaction#createReadStream} when using {@link ReadPartition}. - * @see {@link Transaction#runStream} when using {@link QueryParition}. + * @see {@link Transaction#runStream} when using {@link QueryPartition}. * * @param {ReadPartition|QueryPartition} partition The partition object. * @returns {ReadableStream} A readable stream that emits rows. diff --git a/src/database.js b/src/database.js index df5ca133e..a8465d86b 100644 --- a/src/database.js +++ b/src/database.js @@ -677,7 +677,7 @@ Database.prototype.getSchema = function(callback) { * // To control how many API requests are made and page through the results * // manually, set `autoPaginate` to `false`. * //- - * function callback(err, instances, nextQuery, apiResponse) { + * function callback(err, sessions, nextQuery, apiResponse) { * if (nextQuery) { * // More results exist. * database.getSessions(nextQuery, callback); From fca9c6e6fde059d30457d44c94465e6cfd14b6ed Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Wed, 28 Feb 2018 15:04:51 -0500 Subject: [PATCH 4/4] update executeStream example --- src/batch-transaction.js | 51 ++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/src/batch-transaction.js b/src/batch-transaction.js index 90c3f145a..04d879734 100644 --- a/src/batch-transaction.js +++ b/src/batch-transaction.js @@ -273,29 +273,34 @@ BatchTransaction.prototype.execute = function(partition, callback) { * * const instance = spanner.instance('my-instance'); * const database = instance.database('my-database'); - * const transaction = database.batchTransaction(identifier); - * - * transaction.createReadPartitions(options, function(err, partitions) { - * const partition = partitions[0]; - * - * transaction - * .executeStream(partition) - * .on('error', function(err) {}) - * .on('data', function(row) { - * // row = [ - * // { - * // name: 'SingerId', - * // value: '1' - * // }, - * // { - * // name: 'Name', - * // value: 'Eddie Wilson' - * // } - * // ] - * }) - * .on('end', function() { - * // All results retrieved - * }); + * + * database.createBatchTransaction(function(err, transaction) { + * if (err) { + * // Error handling omitted. + * } + * + * transaction.createReadPartitions(options, function(err, partitions) { + * const partition = partitions[0]; + * + * transaction + * .executeStream(partition) + * .on('error', function(err) {}) + * .on('data', function(row) { + * // row = [ + * // { + * // name: 'SingerId', + * // value: '1' + * // }, + * // { + * // name: 'Name', + * // value: 'Eddie Wilson' + * // } + * // ] + * }) + * .on('end', function() { + * // All results retrieved + * }); + * }); * }); */ BatchTransaction.prototype.executeStream = function(partition) {