diff --git a/bin/run_mysql.js b/bin/run_mysql.js index 727b9bf..0d72b0f 100644 --- a/bin/run_mysql.js +++ b/bin/run_mysql.js @@ -13,52 +13,136 @@ var envVars = [ 'FACEBOOK_PASSWORD', 'AUTHORISED_USERNAME', 'DATABASE_URL', + 'METADATA_TRACKING', ]; envVars.forEach(function(name) { if (process.env[name] == null) throw new Error('Environment Variable ' + name + ' not set'); }); +var CREATE_SETTINGS_SQL = "CREATE TABLE settings ( id INT, settings_json TEXT, PRIMARY KEY(id) );"; +var CREATE_MESSAGES_SQL = "CREATE TABLE messages( fb_threadid VARCHAR(80), fb_message_id VARCHAR(80), slack_message_timestamp VARCHAR(80), fb_message_timestamp BIGINT, is_cur_read BOOL, PRIMARY KEY(fb_message_id), INDEX(fb_threadid) );"; +var pool; +function create_pool(callback){ + var settings = JSON.parse(process.env.DATABASE_URL); + settings.supportBigNumbers=true; + pool= mysql.createPool(settings); + pool.getConnection(function(err, client) { + if(err){ + return callback(new Error("Couldn't connect to mysql db: " + err.message)); + } + + createTableIfNeeded(client,"settings",CREATE_SETTINGS_SQL, function (err){ + if(err){ + client.release(); + return callback(new Error("Couldn't create the settings table: " + err.message)); + } + createTableIfNeeded(client,"messages",CREATE_MESSAGES_SQL, function (err){ + if(err){ + client.release(); + return callback(new Error("Couldn't create the messages table: " + err.message)); + } + }); + return callback(null); + }); + }); +} +//msg_obj should have the props with the column names +function insert_message(msg_obj,callback) { + pool.getConnection(function(err, client) { + if(err) + return callback(new Error("Couldn't connect to mysql db: " + err.message)); + client.query("INSERT IGNORE INTO messages(fb_threadid, fb_message_id, slack_message_timestamp, fb_message_timestamp, is_cur_read) VALUES(?,?,?,?,0)",[msg_obj.fb_threadid,msg_obj.fb_message_id,msg_obj.slack_message_timestamp,msg_obj.fb_message_timestamp], function(err, result){ + client.release(); + if(err) + return callback(new Error("Couldn't insert into the messages table: " + err.message)); + callback(null); + }); + }); +} +function message_exists(fb_message_id,callback) { + pool.getConnection(function(err, client) { + if(err) + return callback(new Error("Couldn't connect to mysql db: " + err.message)); -// Load the settings and JSON from mysql -function load_data(callback) { - var client = mysql.createConnection(JSON.parse(process.env.DATABASE_URL)); + client.query("SELECT fb_message_id FROM messages WHERE fb_message_id = ?",[fb_message_id], function(err, result){ + client.release(); + if(err) + return callback(new Error("Couldn't select from the messages table: " + err.message)); + callback(null,result.length != 0); + }); + }); +} +function get_cur_read_msg_on_thread(fb_threadid,callback) { + pool.getConnection(function(err, client) { + if(err) + return callback(new Error("Couldn't connect to mysql db: " + err.message)); - client.connect(function(err) { - if (err) { - return callback( - new Error("Couldn't connect to mysql db: " + err.message) - ); - } + client.query("SELECT slack_message_timestamp FROM messages WHERE is_cur_read=1 && fb_threadid = ?",[fb_threadid], function(err, result){ + client.release(); + if(err) + return callback(new Error("Couldn't select from the messages table: " + err.message)); + callback(null,result.length != 0 ? result[0].slack_message_timestamp : null); + }); + }); +} - client.query( - 'SELECT settings_json FROM settings WHERE id = 1', - function(err, result) { - if (err || result.length == 0) { - return callback(new Error('No settings in mysql table')); - } +//This will get the message that is on the thread that is less or equal to the fb timestamp the user read through and set that message as is_cur_read value and clear out the old one +function get_and_set_read_msg_on_thread(fb_threadid,fb_as_new_as_timestamp,callback){ + pool.getConnection(function(err, client) { + if(err) + return callback(new Error("Couldn't connect to mysql db: " + err.message)); + client.query("UPDATE messages SET is_cur_read=0 WHERE is_cur_read=1 && fb_threadid=?",[fb_threadid], function(err, result){ + if(err){ + client.release(); + return callback(new Error("Couldn't select from the messages table: " + err.message)); + } - try { - client.end(); - return callback(null, JSON.parse(result[0].settings_json)); - } catch (err) { - return callback( - 'Found results in mysql table, but failed to parse: ' + - err - ); + client.query("SELECT fb_message_id,slack_message_timestamp FROM messages WHERE fb_message_timestamp <= ? && fb_threadid = ? ORDER BY fb_message_timestamp desc LIMIT 1",[fb_as_new_as_timestamp,fb_threadid], function(err, result){ + if(err){ + client.release(); + return callback(new Error("Couldn't select from the messages table: " + err.message)); } + client.query("UPDATE messages SET is_cur_read=1 WHERE fb_message_id=?",[result.length != 0 ? result[0].fb_message_id : ""], function(err, res2){ + client.release(); + if(err) + return callback(new Error("Couldn't select from the messages table: " + err.message)); + callback(null,result.length != 0 ? result[0].slack_message_timestamp : null); + }); + }); + }); + }); + +} + +// Load the settings and JSON from mysql +function load_data(callback){ + + pool.getConnection(function(err, client) { + if(err){ + return callback(new Error("Couldn't connect to mysql db: " + err.message)); + } + + client.query("SELECT settings_json FROM settings WHERE id = 1", function(err, result){ + if(err || result.length == 0){ + client.release(); + return callback(new Error("No settings in mysql table")); } - ); + + try { + client.release(); + return callback(null, JSON.parse(result[0].settings_json)); + } catch (err){ + return callback("Found results in mysql table, but failed to parse: " + err); + } + }); }); } -function createTableIfNeeded(client, callback) { - client.query('SELECT * FROM settings LIMIT 1', function(err, result) { - if (err) { - return client.query( - 'CREATE TABLE settings (id INT, settings_json TEXT, PRIMARY KEY(id) )', - callback - ); +function createTableIfNeeded(client, table, create_sql, callback){ + client.query("SELECT * FROM " + table + " LIMIT 1", function(err, result){ + if(err) { + return client.query(create_sql, callback); } else { // table exists return callback(null); @@ -66,36 +150,20 @@ function createTableIfNeeded(client, callback) { }); } -function save_data(data, callback) { - var client = mysql.createConnection(JSON.parse(process.env.DATABASE_URL)); - - client.connect(function(err) { - if (err) { - return callback( - new Error("Couldn't connect to mysql db: " + err.message) - ); +function save_data(data, callback){ + pool.getConnection(function(err, client) { + if(err){ + return callback(new Error("Couldn't connect to mysql db: " + err.message)); } - createTableIfNeeded(client, function(err) { - if (err) { - return callback( - new Error( - "Couldn't create the settings table: " + err.message - ) - ); - } - var insertQuery = 'INSERT INTO settings(id, settings_json) VALUES (1, ?) ON DUPLICATE KEY UPDATE settings_json=VALUES(settings_json)'; - insertQuery = mysql.format(insertQuery, [JSON.stringify(data)]); - client.query(insertQuery, function(err, result) { - if (err) - return callback( - new Error( - "Couldn't insert/update settings table: " + - err.message - ) - ); - callback(); + + var insertQuery = "INSERT INTO settings(id, settings_json) VALUES (1, ?) ON DUPLICATE KEY UPDATE settings_json=VALUES(settings_json)"; + insertQuery = mysql.format(insertQuery,[JSON.stringify(data)]); + client.query(insertQuery, function(err, result){ + client.release(); + if(err) + return callback(new Error("Couldn't insert/update settings table: " + err.message)); + callback(); }); - }); }); } @@ -106,10 +174,15 @@ var settings = { debug_messages: process.env.DEBUG_MESSAGES || false, facebook: { email: process.env.FACEBOOK_EMAIL, - pass: process.env.FACEBOOK_PASSWORD, - }, -}; - -var facebot = new Facebot(settings, load_data, save_data); - -facebot.run(); + pass: process.env.FACEBOOK_PASSWORD + } +} +create_pool(function(err) { + if (err) + throw new Error("Error initializing pool of: " + err); + var meta_funcs = {}; + if (process.env.METADATA_TRACKING) + meta_funcs = {get_cur_read_msg_on_thread:get_cur_read_msg_on_thread,get_and_set_read_msg_on_thread:get_and_set_read_msg_on_thread,message_exists:message_exists,insert_message:insert_message}; + var facebot = new Facebot(settings, load_data, save_data, meta_funcs); + facebot.run(); +}); diff --git a/lib/facebot.js b/lib/facebot.js index 66bdeeb..c5a2780 100644 --- a/lib/facebot.js +++ b/lib/facebot.js @@ -8,11 +8,80 @@ var facebook = require('facebook-chat-api'); var fbUtil = require('./util'); var emoji_lib = require('js-emoji'); var emoji = new emoji_lib.EmojiConvertor(); +var extend = require('extend'); + +// https://github.com/mishk0/slack-bot-api/pull/32 +/** + * Posts a reaction (emoji) to a message by timestamp + * @param {string} id - channel ID + * @param {string} emoji - emoji string (without the : symbols) + * @param {string} ts - timestamp of the message you want to react to + * @param {object} params + * @returns {vow.Promise} + */ +slackbots.prototype.postReactionToChannel = function(id, emoji, ts, params) { + params = extend({ + channel: id, + name: emoji, + timestamp: ts + }, params || {}); + + return this._api('reactions.add', params); +}; + +/** + * Removes a reaction (emoji) by timestamp + * @param {string} id - channel ID + * @param {string} emoji - emoji string (without the : symbols) + * @param {string} ts - timestamp of the message you want to react to + * @param {object} params + * @returns {vow.Promise} + */ +slackbots.prototype.removeReactionFromChannel = function(id, emoji, ts, params) { + params = extend({ + channel: id, + name: emoji, + timestamp: ts + }, params || {}); + + return this._api('reactions.remove', params); +}; + +/** + * Returns a list of all reactions for a message (specified by timestamp) + * @param {string} id - channel ID + * @param {string} ts - timestamp of the message + * @param {object} params + * @returns {vow.Promise} + */ +slackbots.prototype.getReactions = function(id, ts, params) { + params = extend({ + channel: id, + timestamp: ts + }, params || {}); + + return this._api('reactions.get', params); +}; + +/** + * Returns a list of all items reacted to by a user + * @param {string} id - user ID + * @param {object} params + * @returns {vow.Promise} + */ +slackbots.prototype.listReactions = function(id, ts, params) { + params = extend({ + user: id + }, params || {}); + + return this._api('reactions.list', params); +}; // Load_data: function(callback(err, data)) // Save_data: function(data, callback(err)) // data: { appState: object, channelLinks: [] } -var Facebot = function Constructor(settings, load_data, save_data) { +// message_db: optional, if specified used for tracking message state +var Facebot = function Constructor(settings, load_data, save_data, message_db){ this.settings = settings; this.settings.name = this.settings.name || 'facebot'; this.user = null; @@ -20,7 +89,8 @@ var Facebot = function Constructor(settings, load_data, save_data) { this.load_data = load_data; this.save_data = save_data; - + this.message_db = message_db; + // array of { slack_channel: string id, fb_thread: string id } this.channelLinks = []; this.fb_users = {}; @@ -135,20 +205,34 @@ Facebot.prototype.saveData = function() { // Creates the FB api using either saved tokens or username // and password passed in as credentials -Facebot.prototype.createFBApi = function(credentials) { - return Q.nfcall(facebook, credentials).then(api => { - this.sendDebugMessage('Logged into facebook'); - - this.facebookApi = api; - api.setOptions({ - logLevel: 'error', - listenEvents: true, - }); - api.listen((err, fbmessage) => { - if (!err) this.handleFBNotification(fbmessage); - }); - }); -}; +Facebot.prototype.createFBApi = function(credentials){ + return Q.nfcall(facebook, credentials) + .then(api => { + this.sendDebugMessage("Logged into facebook") + + this.facebookApi = api; + this.ourFacebookId = api.getCurrentUserID(); + var opts = { + logLevel: "error", + listenEvents: true, + }; + if (this.message_db){ + opts.selfListen=true; + this.ourSenderInfo = {icon_url:`http://graph.facebook.com/${this.ourFacebookId}/picture?type=square`,username:"us"}; + fbUtil.findFBUser(this.facebookApi,this.ourFacebookId,true) + .then( friend => {this.ourSenderInfo.username=friend.name; } ) + .catch(function (error) {console.log("FIND FRIEND ERROR OF: " + error);}); + } + + api.setOptions(opts); + api.listen((err, fbmessage) => { + if(!err){ + //wait to make sure it had a chance to get added to db first... + setTimeout( ()=> this.handleFBNotification(fbmessage), 250 ); + } + }); + }); +} // loop that will continue to send the is typing indicator to a channel // until we hear back they are not typing, or 10 minutes have past @@ -169,25 +253,47 @@ Facebot.prototype.handleTypeNotification = function(fbmessage, link) { link.typing_start_time = Date.now() / 1000; this.typingLoop(link); } -}; -Facebot.prototype.postFBMessageToSlack = function(fbmessage, link) { - if (fbmessage.body !== undefined) { - var message_text = emoji.replace_emoticons_with_colons(fbmessage.body); - this.postMessage(link.slack_channel, message_text, { - username: link.fb_name, - icon_url: link.icon, +} +Facebot.prototype.postFBMessageToSlack = function (fbmessage, link) { + if (this.message_db) + promise = Q.nfcall(this.message_db.message_exists,fbmessage.messageID); + else + promise = Q.fcall(function () {return false;}); + promise.then( already_exists => { + if (already_exists) + return; + if (fbmessage.body !== undefined){ + var message_text = emoji.replace_emoticons_with_colons(fbmessage.body); + var sender_info = { username: link.fb_name, icon_url: link.icon }; + if (fbmessage.senderID == us.ourFacebookId) + sender_info = us.ourSenderInfo; + this.postMessage(link.slack_channel, + message_text, + sender_info) + .then( data => { + if (this.message_db) + this.message_db.insert_message({fb_threadid:fbmessage.threadID,fb_message_id:fbmessage.messageID,slack_message_timestamp:data.message.ts,fb_message_timestamp:fbmessage.timestamp},function(err){if (err) throw new Error("err adding of: " + err);}); }); - } - this.handleFBAttachments(fbmessage, link); -}; + } + this.handleFBAttachments(fbmessage, link); + }); +} +Facebot.prototype.handleReadReceipt = function(fbmessage, link){ + Q.nfcall(this.message_db.get_cur_read_msg_on_thread,fbmessage.threadID) + .then ( timestamp => {if (timestamp) this.removeReactionFromChannel(link.slack_channel,"eye",timestamp);return null;} ) + .then ( () => {return Q.nfcall(this.message_db.get_and_set_read_msg_on_thread,fbmessage.threadID,fbmessage.time);} ) + .then( timestamp => {return timestamp ? this.postReactionToChannel(link.slack_channel,"eye",timestamp) : null;} ) + .catch(function (error) {console.log("READ RECEIPT ERROR OF: " + error.error);}) + .done(); +} + // Handles any facebook messages/events received, formats them // and sends them through to the linked slack channels Facebot.prototype.handleFBNotification = function(fbmessage) { var threadID = undefined; - if (fbmessage.type == 'message') threadID = fbmessage.threadID; - if ( - fbmessage.type == 'typ' //not sure why typing messages appear on a different thread but they do so we need to use from... - ) + if (fbmessage.type == "message" || fbmessage.type == "read_receipt") + threadID = fbmessage.threadID; + if (fbmessage.type == "typ")//not sure why typing messages appear on a different thread but they do so we need to use from... threadID = fbmessage.from; if (!threadID) return; @@ -198,8 +304,12 @@ Facebot.prototype.handleFBNotification = function(fbmessage) { case 'typ': this.handleTypeNotification(fbmessage, link); break; - case 'message': - this.postFBMessageToSlack(fbmessage, link); + case "read_receipt": + if (this.message_db) + this.handleReadReceipt(fbmessage,link); + break; + case "message": + this.postFBMessageToSlack(fbmessage,link); break; } }); @@ -342,14 +452,14 @@ Facebot.prototype.postSlackMessagesToFB = function(message) { msg, link.fb_thread, (err, msgInfo) => { - if (err) - this.postMessage( - link.slack_channel, - 'Error sending last message: ' + err.error, - { as_user: true } - ); - } - ); + if(err) + this.postMessage(link.slack_channel, + "Error sending last message: " + err.error, + { as_user: true }); + else if (this.message_db) + this.message_db.insert_message({fb_threadid:link.fb_thread,fb_message_id:msgInfo.messageID,slack_message_timestamp:message.ts,fb_message_timestamp:msgInfo.timestamp},function(err){if (err) throw new Error("err adding of: " + err);}); + + }); }); } };