From 728679eb3ba178d50fa1c6f038e40682ba27926c Mon Sep 17 00:00:00 2001 From: snlamm Date: Wed, 20 Dec 2017 12:26:54 -0500 Subject: [PATCH 1/3] sqs integration --- package.json | 1 + src/Commands/QueueWorkCommand.js | 4 +- src/Drivers/BaseDriver.js | 10 ++ src/Drivers/SQSDriver.js | 105 +++++++++++++ src/QueueFactory.js | 25 ++-- src/QueueProvider.js | 7 +- src/Support/SQS.js | 244 +++++++++++++++++++++++++++++++ src/index.js | 2 + 8 files changed, 386 insertions(+), 12 deletions(-) create mode 100644 src/Drivers/SQSDriver.js create mode 100644 src/Support/SQS.js diff --git a/package.json b/package.json index 0cdf4e9..50467c5 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "devDependencies": { "amqplib": "^0.5.1", "ava": "^0.18.1", + "aws-sdk": "^2.172.0", "babel-cli": ">=6.7 <=7.0", "babel-eslint": "^6.1.1", "babel-preset-grind": "^0.8.0-beta.1", diff --git a/src/Commands/QueueWorkCommand.js b/src/Commands/QueueWorkCommand.js index 8bc7ed9..c84f53f 100644 --- a/src/Commands/QueueWorkCommand.js +++ b/src/Commands/QueueWorkCommand.js @@ -11,7 +11,7 @@ export class QueueWorkCommand extends Command { options = [ new InputOption('queue', InputOption.VALUE_OPTIONAL, 'Specify the queue(s) to perform work for.'), - new InputOption('concurrency', InputOption.VALUE_OPTIONAL, 'Number of jobs to process concurrency.', '1'), + new InputOption('concurrency', InputOption.VALUE_OPTIONAL, 'Number of jobs to process concurrently.', '1'), new InputOption('watch', InputOption.VALUE_OPTIONAL, 'Folders to watch for changes') ] @@ -36,7 +36,7 @@ export class QueueWorkCommand extends Command { async run() { let queues = null - const connection = this.app.queue.get(this.argument('connection')) + const connection = await this.app.queue.get(this.argument('connection')) if(this.containsOption('queue')) { queues = this.option('queue').split(/,/).map(job => job.trim()).filter(job => job.length > 0) diff --git a/src/Drivers/BaseDriver.js b/src/Drivers/BaseDriver.js index 8659dcd..506f31b 100644 --- a/src/Drivers/BaseDriver.js +++ b/src/Drivers/BaseDriver.js @@ -2,6 +2,7 @@ * Base class all drivers must extend */ export class BaseDriver { + app = null state = null retryDelay = 90000 @@ -16,6 +17,15 @@ export class BaseDriver { } } + /** + * Performs setup operations when starting the driver + * + * @return Promise + */ + ready() { + return Promise.resolve() + } + /** * Connects to the backend engine * diff --git a/src/Drivers/SQSDriver.js b/src/Drivers/SQSDriver.js new file mode 100644 index 0000000..9777801 --- /dev/null +++ b/src/Drivers/SQSDriver.js @@ -0,0 +1,105 @@ +import './BaseDriver' +import '../Support/SQS' + +/** + * AWS Simple Queue Service (SQS) backed Queue Driver + */ +export class SQSDriver extends BaseDriver { + client = null + + constructor(app, config) { + super(app, config) + + this.client = new SQS(config) + } + + ready() { + return this.client.createQueue() + } + + connect() { + return Promise.resolve() + } + + async dispatch(job) { + const payload = this.buildPayload(job) + const queueUrl = this.client.queueUrls[payload.queue] + + if(queueUrl.isNil) { + return Promise.reject(`Unable to find SQS url for job queue ${payload.queue}`) + } + + const params = { + QueueUrl: queueUrl, + MessageBody: JSON.stringify(payload), + DelaySeconds: payload.delay || 0 + } + + return this.client.put(params) + } + + listen(queues, concurrency, jobHandler, errorHandler) { + // SQS can batch ingest up to 10 jobs in a single call + const concurrentListens = Math.ceil(concurrency / 10) + const remainderJobConcurrency = 10 - ((concurrentListens * 10) - concurrency) + + const listeners = [ ] + + for(let i = 0; i < concurrentListens; i++) { + for(const queue of queues) { + let concurrentJobs = 10 + + if(i === (concurrentListens - 1)) { + concurrentJobs = remainderJobConcurrency + } + + listeners.push(this._listen(queue, concurrentJobs, jobHandler, errorHandler)) + } + } + + return Promise.all(listeners) + } + + _listen(queue, concurrency, jobHandler, errorHandler) { + return this.client.watch(queue, concurrency, async function callback(jobData, dispatchedAt, pastTries = 1) { + const job = JSON.parse(jobData) + const isExpired = timeout => { + return (timeout !== 0) && ((new Date - dispatchedAt) > timeout) + } + + try { + if(isExpired(job.timeout)) { + return + } + + await jobHandler(job) + } catch(err) { + try { + const tries = Number.parseInt(job.tries) || 1 + + if((pastTries >= tries) || isExpired(job.timeout)) { + throw err + } + + if(job.retry_delay > 0) { + await new Promise(resolve => { + return setTimeout(() => { + return resolve() + }, job.retry_delay) + }) + } + } catch(err) { + return errorHandler(job, err) + } + + return callback(jobData, dispatchedAt, pastTries += 1) + } + }) + } + + destroy() { + this.client.constructor.queueUrls = { } + return super.destroy() + } + +} diff --git a/src/QueueFactory.js b/src/QueueFactory.js index 659648f..37d387a 100644 --- a/src/QueueFactory.js +++ b/src/QueueFactory.js @@ -5,6 +5,7 @@ import './Drivers/BaseDriver' import './Drivers/BeanstalkDriver' import './Drivers/FaktoryDriver' import './Drivers/RabbitDriver' +import './Drivers/SQSDriver' export class QueueFactory { app = null @@ -16,22 +17,25 @@ export class QueueFactory { faktory: FaktoryDriver, rabbit: RabbitDriver, rabbitmq: RabbitDriver, - amqp: RabbitDriver + amqp: RabbitDriver, + sqs: SQSDriver } constructor(app) { this.app = app } - dispatch(job, connection = null) { - return this.get(connection).dispatch(job) + async dispatch(job, connection = null) { + connection = await this.get(connection) + return connection.dispatch(job) } - status(job, connection = null) { - return this.get(connection).status(job) + async status(job, connection = null) { + connection = await this.get(connection) + return connection.status(job) } - get(connection) { + async get(connection) { let name = null if(connection.isNil) { @@ -60,7 +64,7 @@ export class QueueFactory { throw new Error(`Unsupported queue driver: ${config.driver}`) } - connection = this.make(driverClass, config) + connection = await this.make(driverClass, config) if(!name.isNil) { this.connections[name] = connection @@ -69,8 +73,11 @@ export class QueueFactory { return connection } - make(driverClass, config) { - return new Queue(this.app, this, new driverClass(this.app, config)) + async make(driverClass, config) { + const driver = new driverClass(this.app, config) + await driver.ready() + + return new Queue(this.app, this, driver) } registerDriver(name, driverClass) { diff --git a/src/QueueProvider.js b/src/QueueProvider.js index 41c6b8b..bdbda86 100644 --- a/src/QueueProvider.js +++ b/src/QueueProvider.js @@ -3,10 +3,15 @@ import './QueueFactory' import './Commands/MakeJobCommand' import './Commands/QueueWorkCommand' -export function QueueProvider(app, classes = { }) { +export async function QueueProvider(app, classes = { }) { const factoryClass = classes.factoryClass || QueueFactory app.queue = new factoryClass(app) + for(const connectionName of Object.keys(app.config.get('queue.connections'))) { + // Trigger initial setup of backend engines + await app.queue.get(connectionName) + } + if(app.cli.isNil) { return } diff --git a/src/Support/SQS.js b/src/Support/SQS.js new file mode 100644 index 0000000..a633107 --- /dev/null +++ b/src/Support/SQS.js @@ -0,0 +1,244 @@ +import { MissingPackageError } from 'grind-framework' + +let sqs = null + +/** + * Loads the aws-sdk package or throws an error + * if it hasn‘t been added + */ +function loadPackage() { + if(!sqs.isNil) { + return + } + + try { + sqs = require('aws-sdk').SQS + } catch(err) { + throw new MissingPackageError('aws-sdk') + } +} + +/** + * Wrapper around AWS SQS to provide a + * promise based interface + simplifies a few ops + */ +export class SQS { + + static queueUrls = { } + + get queueUrls() { + return this.constructor.queueUrls + } + + client = null + queueConfigs = null + + constructor(config) { + loadPackage() + this.queueConfigs = config.queues + + const serviceConfig = { + region: config.region || 'us-east-1' + } + + // NOTE preferred way to access SQS is via an AWS IAM role + if(!config.access_key.isNil && !config.secret_key.isNil) { + serviceConfig.accessKeyId = config.access_key + serviceConfig.secretAccessKey = config.secret_key + } + + this.client = new sqs(serviceConfig) + } + + /* + To create queues, use the `queues` object via the config: + "connections": { + "sqs-connection": { + "driver": "sqs", + "queues": { + "test-queue-name": { ...queue options here }, + "test-queue-name2": { "QueueUrl": "https://sqs.us-east-1..." } + } + } + } + + On app start, grind-queue calls AWS to create/update your queues as necessary and grab the URLs. + For any queue options not specified, we'll use the AWS defaults. + The queue name `default` is reserved for setting queue defaults different from AWS's. + + If you want to use an SQS queue managed without grind-queue, set the QueueUrl option. + Grind-queue will use that url and, on startup, will skip the inital calls to AWS. + */ + async createQueue() { + for(const [ name, queueAttributes ] of Object.entries(this.queueConfigs)) { + // `default` queue name is reserved for setting new defaults + if(name === 'default') { + continue + } + + // If QueueUrl option is set, simply use the url + if(!queueAttributes.QueueUrl.isNil) { + this.queueUrls[name] = queueAttributes.QueueUrl + continue + } + + const defaultAttributes = Object.assign({ + // AWS defaults - set explicitly here to ovveride changes made via SQS console/cli + DelaySeconds: '0', + MaximumMessageSize: '262144', + MessageRetentionPeriod: '345600', + ReceiveMessageWaitTimeSeconds: '0', + VisibilityTimeout: '30' + }, (this.queueConfigs.default || { })) + + const params = { + QueueName: name, + Attributes: { + ...Object.assign(defaultAttributes, queueAttributes) + } + } + + await new Promise((res, rej) => this.findOrCreateQueue(res, rej, name, params)) + } + } + + async findOrCreateQueue(resolve, reject, name, params) { + let queueUrl = params.QueueUrl + + if(!queueUrl.isNil) { + // set remote url for queue + this.queueUrls[name] = queueUrl + return resolve() + } + + try { + queueUrl = await new Promise((resolve2, reject2) => { + return this.client.createQueue(params, async (err, data) => { + if(!err.isNil && (err.code === 'QueueAlreadyExists')) { + // error triggered when remote queue exists but config and remote attributes differ + await this.findAndUpdateQueue(params) + return resolve2(params.QueueUrl) + } else if(!err.isNil) { + return reject2(err) + } + + // resolves here if queue is created -or- queue exists and config and remote attributes match + return resolve2(data.QueueUrl) + }) + }) + } catch(err) { + return reject(err) + } + + // set remote url for queue + this.queueUrls[name] = queueUrl + return resolve() + } + + async findAndUpdateQueue(params) { + // Set queue url then update queue attributes + params.QueueUrl = await new Promise((resolve, reject) => { + return this.client.getQueueUrl({ QueueName: params.QueueName }, (err, data) => { + return err.isNil ? resolve(data.QueueUrl) : reject(err) + }) + }) + + return new Promise((resolve, reject) => { + delete params.QueueName + + return this.client.setQueueAttributes(params, err /* ,data */ => { + return err.isNil ? resolve() : reject(err) + }) + }) + } + + put(params) { + return new Promise((resolve, reject) => { + return this.client.sendMessage(params, (err, data) => { + return err.isNil ? resolve(data) : reject(err) + }) + }) + } + + async watch(queue, concurrency, handler) { + const queueUrl = this.queueUrls[queue] + + if(queueUrl.isNil) { + throw new Error(`Queue url not found ${queue}`) + } + + const params = { + QueueUrl: queueUrl, + MaxNumberOfMessages: concurrency, + // VisibilityTimeout: 30 - set via queue-wide VisibilityTimeout + WaitTimeSeconds: 20, + AttributeNames: [ 'SentTimestamp' ] + } + + const messages = await new Promise(resolve => { + return this.client.receiveMessage(params, (err, data) => { + if(err) { + return resolve() + } else if((data.Messages === void 0) || (data.Messages.length === 0)) { + return resolve() + } + + return resolve(data.Messages) + }) + }) + + if(Array.isArray(messages)) { + // Delete jobs from SQS so they are not reprocessed, then process them + await this.deleteFromQueue(queueUrl, messages) + await Promise.all(messages.filter(message => message.isDeleted).map(message => { + const messageData = message.Body + const dispatchedAt = new Date(message.Attributes.SentTimestamp) + + return handler(messageData, dispatchedAt) + })) + } + + // Once finished processing current jobs, re-watch the queue + return this.watch(queue, concurrency, handler) + } + + async deleteFromQueue(url, messages) { + const params = { + QueueUrl: url, + Entries: messages.map((message, i) => { + return { + Id: `${i}`, + ReceiptHandle: message.ReceiptHandle + } + }) + } + + try { + await new Promise((resolve, reject) => { + return this.client.deleteMessageBatch(params, (err, data) => { + if(!err.isNil) { + return reject(err) + } + + for(const result of data.Successful) { + messages[result.Id].isDeleted = true + } + + if(data.Failed.length > 0) { + const failed = data.Failed + const mssg = failed.map(result => `{senderFault: ${result.SenderFault}, code: ${result.Code}}`) + + Log.error( + `Failed to delete ${failed.length} messasges. Releasing back into queue. Error ${mssg}` + ) + } + + return resolve() + }) + }) + } catch(err) { + Log.error(`Failed to delete messasges. Releasing back into queue. Error ${err}`) + } + } + +} diff --git a/src/index.js b/src/index.js index 8ec281a..a52be02 100644 --- a/src/index.js +++ b/src/index.js @@ -11,6 +11,7 @@ import './Drivers/BaseDriver' import './Drivers/BeanstalkDriver' import './Drivers/FaktoryDriver' import './Drivers/RabbitDriver' +import './Drivers/SQSDriver' export { Job, @@ -23,6 +24,7 @@ export { BeanstalkDriver, FaktoryDriver, RabbitDriver, + SQSDriver, // Commands QueueWorkCommand, From cf5b3d0fb4ed3c5089c1365b9ef1dbf826d84204 Mon Sep 17 00:00:00 2001 From: snlamm Date: Tue, 9 Jan 2018 16:54:29 -0500 Subject: [PATCH 2/3] handle endpoint option in config, fix delay integer parsing, improve shutdown behavior --- package.json | 3 ++- src/Drivers/SQSDriver.js | 8 ++++++-- src/Support/SQS.js | 22 +++++++++++++++++++++- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index 50467c5..54e8f77 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,8 @@ "beanstalk", "ampq", "queue", - "faktory" + "faktory", + "sqs" ], "peerDependencies": { "grind-cli": "^0.7.0", diff --git a/src/Drivers/SQSDriver.js b/src/Drivers/SQSDriver.js index 9777801..19c87ec 100644 --- a/src/Drivers/SQSDriver.js +++ b/src/Drivers/SQSDriver.js @@ -31,8 +31,11 @@ export class SQSDriver extends BaseDriver { const params = { QueueUrl: queueUrl, - MessageBody: JSON.stringify(payload), - DelaySeconds: payload.delay || 0 + MessageBody: JSON.stringify(payload) + } + + if(payload.delay > 0) { + params.DelaySeconds = Math.round(payload.delay / 1000) } return this.client.put(params) @@ -98,6 +101,7 @@ export class SQSDriver extends BaseDriver { } destroy() { + this.client.isShutdown = true this.client.constructor.queueUrls = { } return super.destroy() } diff --git a/src/Support/SQS.js b/src/Support/SQS.js index a633107..1f96b5c 100644 --- a/src/Support/SQS.js +++ b/src/Support/SQS.js @@ -32,6 +32,7 @@ export class SQS { client = null queueConfigs = null + isShutdown = false constructor(config) { loadPackage() @@ -47,6 +48,12 @@ export class SQS { serviceConfig.secretAccessKey = config.secret_key } + // Point endpoint to a mock SQS server for local testing + // for example, use elasticmq standalone server or docker image + if(!config.endpoint.isNil) { + serviceConfig.endpoint = config.endpoint + } + this.client = new sqs(serviceConfig) } @@ -161,6 +168,10 @@ export class SQS { } async watch(queue, concurrency, handler) { + if(this.isShutdown) { + return + } + const queueUrl = this.queueUrls[queue] if(queueUrl.isNil) { @@ -177,7 +188,11 @@ export class SQS { const messages = await new Promise(resolve => { return this.client.receiveMessage(params, (err, data) => { - if(err) { + if(!err.isNil) { + if(!this.isShutdown) { + Log.error(err) + } + return resolve() } else if((data.Messages === void 0) || (data.Messages.length === 0)) { return resolve() @@ -187,6 +202,11 @@ export class SQS { }) }) + // No built-in way to interrupt receiveMessage polling once it starts, so shutdown exits before handling + if(this.isShutdown) { + return + } + if(Array.isArray(messages)) { // Delete jobs from SQS so they are not reprocessed, then process them await this.deleteFromQueue(queueUrl, messages) From 020be086c7e7184280198e72463e2ea055e53206 Mon Sep 17 00:00:00 2001 From: snlamm Date: Tue, 9 Jan 2018 16:55:37 -0500 Subject: [PATCH 3/3] Setup tests for SQSDriver --- test/SQSDriver.js | 94 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 test/SQSDriver.js diff --git a/test/SQSDriver.js b/test/SQSDriver.js new file mode 100644 index 0000000..d97bd79 --- /dev/null +++ b/test/SQSDriver.js @@ -0,0 +1,94 @@ +import { serial as test } from 'ava' + +import './helpers/TestJob' +import './helpers/Listener' +import './helpers/Service' + +import '../src/Drivers/SQSDriver' + +const service = new Service(test, 'sqs', { + image: 'vsouza/sqs-local', + port: 9324 +}) +const queueName = 'test' + +test.beforeEach(async t => { + t.context.driver = new SQSDriver(null, { + endpoint: `http://localhost:${service.port}`, + access_key: 'fake-id', + secret_key: 'fake-secret', + queues: { + [queueName]: { } + } + }) + + await t.context.driver.ready() + return t.context.driver.connect() +}) + +test.afterEach.always(async t => { + const queueUrl = t.context.driver.client.queueUrls[queueName] + t.context.driver.destroy() + + // Delete the queue so it will be recreated + // This prevents the current Listener's residual long-polling from getting the next test's messages + await new Promise((resolve, reject) => { + return t.context.driver.client.client.deleteQueue({ QueueUrl: queueUrl }, err => { + if(err) { + return reject(err) + } + + return resolve() + }) + }) +}) + +test('dispatch', async t => { + const payload = { time: Date.now() } + const job = new TestJob({ ...payload }) + + await t.context.driver.dispatch(job) + + return Listener(t.context.driver, job => t.deepEqual(job.data.data, payload)) +}) + +test('retry dispatch', async t => { + const payload = { time: Date.now() } + const job = new TestJob({ ...payload }) + let tries = 0 + + await t.context.driver.dispatch(job.$tries(2)) + + return Listener(t.context.driver, job => { + t.is(job.tries, 2) + t.deepEqual(job.data.data, payload) + + if(++tries === 1 || tries > 2) { + throw new Error + } + }) +}) + +test('multi dispatch', t => { + let count = 0 + + setTimeout(() => t.context.driver.dispatch(new TestJob({ id: 1 })), 50) + setTimeout(() => t.context.driver.dispatch(new TestJob({ id: 2 })), 100) + setTimeout(() => t.context.driver.dispatch(new TestJob({ id: 3 })), 200) + setTimeout(() => t.context.driver.dispatch(new TestJob({ id: 4 })), 400) + + return Listener(t.context.driver, () => ++count < 4).then(() => t.is(count, 4)) +}) + +test('delayed dispatch', async t => { + const payload = { time: Date.now() } + const job = new TestJob({ ...payload }) + + const dispatchedAt = Date.now() + await t.context.driver.dispatch(job.$delay(1000)) + + return Listener(t.context.driver, job => { + t.is(Date.now() - dispatchedAt >= 1000, true) + t.deepEqual(job.data.data, payload) + }) +})