From 926bee2e6728cbec56e28450c56b8228ab184a23 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 19 Jan 2018 17:46:51 -0800 Subject: [PATCH 01/21] support segments --- evaluate_flag.js | 122 +++++++++++++++++++----- feature_store.js | 101 +------------------- in_memory_store.js | 98 +++++++++++++++++++ index.js | 9 +- polling.js | 15 +-- redis_feature_store.js | 206 +--------------------------------------- redis_segment_store.js | 5 + redis_store.js | 210 +++++++++++++++++++++++++++++++++++++++++ requestor.js | 26 ++++- segment_store.js | 3 + streaming.js | 91 ++++++++++++++---- 11 files changed, 529 insertions(+), 357 deletions(-) create mode 100644 in_memory_store.js create mode 100644 redis_segment_store.js create mode 100644 redis_store.js create mode 100644 segment_store.js diff --git a/evaluate_flag.js b/evaluate_flag.js index 0d8b104..1553647 100644 --- a/evaluate_flag.js +++ b/evaluate_flag.js @@ -7,7 +7,7 @@ var builtins = ['key', 'ip', 'country', 'email', 'firstName', 'lastName', 'avata var noop = function(){}; -function evaluate(flag, user, store, cb) { +function evaluate(flag, user, featureStore, segmentStore, cb) { cb = cb || noop; if (!user || user.key === null || user.key === undefined) { cb(null, null, null); @@ -31,7 +31,7 @@ function evaluate(flag, user, store, cb) { return; } - eval_internal(flag, user, store, [], function(err, result, events) { + eval_internal(flag, user, featureStore, segmentStore, [], function(err, result, events) { if (err) { cb(err, result, events); return; @@ -53,19 +53,19 @@ function evaluate(flag, user, store, cb) { return; } -function eval_internal(flag, user, store, events, cb) { +function eval_internal(flag, user, featureStore, segmentStore, events, cb) { // Evaluate prerequisites, if any if (flag.prerequisites) { async.mapSeries(flag.prerequisites, function(prereq, callback) { - store.get(prereq.key, function(f) { + featureStore.get(prereq.key, function(f) { // If the flag does not exist in the store or is not on, the prerequisite // is not satisfied if (!f || !f.on) { callback(new Error("Unsatisfied prerequisite"), null); return; } - eval_internal(f, user, store, events, function(err, value) { + eval_internal(f, user, featureStore, segmentStore, events, function(err, value) { // If there was an error, the value is null, the variation index is out of range, // or the value does not match the indexed variation the prerequisite is not satisfied var variation = get_variation(f, prereq.variation); @@ -86,18 +86,18 @@ function eval_internal(flag, user, store, events, cb) { cb(null, null, events); return; } - evalRules(flag, user, function(e, variation) { + evalRules(flag, user, segmentStore, function(e, variation) { cb(e, variation, events); }); }) } else { - evalRules(flag, user, function(e, variation) { + evalRules(flag, user, segmentStore, function(e, variation) { cb(e, variation, events); }); } } -function evalRules(flag, user, cb) { +function evalRules(flag, user, segmentStore, cb) { var i, j; var target; var variation; @@ -119,22 +119,29 @@ function evalRules(flag, user, cb) { } } - // Check rules - for (i = 0; i < flag.rules.length; i++) { - rule = flag.rules[i]; - if (rule_match_user(rule, user)) { - variation = variation_for_user(rule, user, flag); - cb(variation === null ? new Error("Undefined variation for flag " + flag.key) : null, variation); - return; + async.mapSeries(flag.rules, + function(rule, callback) { + rule_match_user(rule, user, segmentStore, function(matched) { + callback(matched ? rule : null, null); + }); + }, + function(err, results) { + // we use the "error" value to indicate that a rule was successfully matched (since we only care + // about the first match, and mapSeries terminates on the first "error") + if (err) { + var rule = err; + variation = variation_for_user(rule, user, flag); + cb(variation === null ? new Error("Undefined variation for flag " + flag.key) : null, variation); + } else { + // no rule matched; check the fallthrough + variation = variation_for_user(flag.fallthrough, user, flag); + cb(variation === null ? new Error("Undefined variation for flag " + flag.key) : null, variation); + } } - } - - // Check the fallthrough - variation = variation_for_user(flag.fallthrough, user, flag); - cb(variation === null ? new Error("Undefined variation for flag " + flag.key) : null, variation); + ); } -function rule_match_user(r, user) { +function rule_match_user(r, user, segmentStore, cb) { var i; if (!r.clauses) { @@ -142,15 +149,43 @@ function rule_match_user(r, user) { } // A rule matches if all its clauses match - for (i = 0; i < r.clauses.length; i++) { - if (!clause_match_user(r.clauses[i], user)) { - return false; + async.mapSeries(r.clauses, + function(clause, callback) { + clause_match_user(clause, user, segmentStore, function(matched) { + // on the first clause that does *not* match, we raise an "error" to stop the loop + callback(matched ? null : clause, null); + }); + }, + function(err, results) { + cb(!err); } + ); +} + +function clause_match_user(c, user, segmentStore, cb) { + if (c.op == 'segmentMatch') { + async.mapSeries(r.values, + function(value, callback) { + segmentStore.get(value, function(segment) { + if (segment && segment_match_user(segment, user)) { + // on the first segment that matches, we raise an "error" to stop the loop + callback(segment, null); + } else { + callback(null, null); + } + }); + }, + function(err, results) { + // an "error" indicates that a segment *did* match + cb(maybe_negate(c, !!err)); + } + ); + } else { + cb(clause_match_user_no_segments(c, user)); } - return true; } -function clause_match_user(c, user) { +function clause_match_user_no_segments(c, user) { var uValue; var matchFn; var i; @@ -176,6 +211,41 @@ function clause_match_user(c, user) { return maybe_negate(c, match_any(matchFn, uValue, c.values)); } +function segment_match_user(segment, user) { + if (user.key) { + if ((segment.included || []).indexOf(user.key) >= 0) { + return true; + } + if ((segment.excluded || []).indexOf(user.key) >= 0) { + return true; + } + for (var i = 0; i < (segment.rules || []).length; i++) { + if (segment_rule_match_user(segment.rules[i], user, segment.key, segment.salt)) { + return true; + } + } + } + return false; +} + +function segment_rule_match_user(rule, user, segmentKey, salt) { + for (var i = 0; i < (rule.clauses || []).length; i++) { + if (!clause_match_user_no_segments(rule.clauses[i], user)) { + return false; + } + } + + // If the weight is absent, this rule matches + if (!rule.weight) { + return true; + } + + // All of the clauses are met. See if the user buckets in + var bucket = bucket_user(user, segmentKey, rule.bucketBy || "key", salt); + var weight = rule.weight / 100000.0; + return bucket < weight; +} + function maybe_negate(c, b) { if (c.negate) { return !b; diff --git a/feature_store.js b/feature_store.js index 95cbd5e..d9e214a 100644 --- a/feature_store.js +++ b/feature_store.js @@ -1,98 +1,3 @@ -// An in-memory feature store with an async interface. -// It's async as other implementations (e.g. the RedisFeatureStore) -// may be async, and we want to retain interface compatibility. -var noop = function(){}; -function InMemoryFeatureStore() { - var store = {flags:{}}; - - store.get = function(key, cb) { - cb = cb || noop; - - if (this.flags.hasOwnProperty(key)) { - var flag = this.flags[key]; - - if (!flag || flag.deleted) { - cb(null); - } else { - cb(clone(flag)); - } - } else { - cb(null); - } - } - - store.all = function(cb) { - cb = cb || noop; - var results = {}; - - for (var key in this.flags) { - if (this.flags.hasOwnProperty(key)) { - var flag = this.flags[key]; - if (flag && !flag.deleted) { - results[key] = clone(flag); - } - } - } - - cb(results); - } - - store.init = function(flags, cb) { - cb = cb || noop; - this.flags = flags; - this.init_called = true; - cb(); - } - - store.delete = function(key, version, cb) { - cb = cb || noop; - - if (this.flags.hasOwnProperty(key)) { - var old = this.flags[key]; - if (old && old.version < version) { - old.deleted = true; - old.version = version; - this.flags[key] = old; - } - } else { - this.flags[key] = old; - } - - - cb(); - } - - store.upsert = function(key, flag, cb) { - cb = cb || noop; - var old = this.flags[key]; - - if (this.flags.hasOwnProperty(key)) { - var old = this.flags[key]; - if (old && old.version < flag.version) { - this.flags[key] = flag; - } - } else { - this.flags[key] = flag; - } - - cb(); - } - - store.initialized = function() { - return this.init_called === true; - } - - store.close = function() { - // Close on the in-memory store is a no-op - } - - return store; -} - -// Deep clone an object. Does not preserve any -// functions on the object -function clone(obj) { - return JSON.parse(JSON.stringify(obj)); -} - -module.exports = InMemoryFeatureStore; \ No newline at end of file +var InMemoryStore = require('./in_memory_store'); +// There's no difference in implementation between any of the in-memory stores. +module.exports = InMemoryStore; diff --git a/in_memory_store.js b/in_memory_store.js new file mode 100644 index 0000000..949389a --- /dev/null +++ b/in_memory_store.js @@ -0,0 +1,98 @@ +// An in-memory store with an async interface. +// It's async as other implementations (e.g. the RedisFeatureStore) +// may be async, and we want to retain interface compatibility. +var noop = function(){}; +function InMemoryStore() { + var store = {items:{}}; + + store.get = function(key, cb) { + cb = cb || noop; + + if (this.items.hasOwnProperty(key)) { + var item = this.items[key]; + + if (!item || item.deleted) { + cb(null); + } else { + cb(clone(item)); + } + } else { + cb(null); + } + } + + store.all = function(cb) { + cb = cb || noop; + var results = {}; + + for (var key in this.items) { + if (this.items.hasOwnProperty(key)) { + var item = this.items[key]; + if (item && !item.deleted) { + results[key] = clone(item); + } + } + } + + cb(results); + } + + store.init = function(items, cb) { + cb = cb || noop; + this.items = items; + this.init_called = true; + cb(); + } + + store.delete = function(key, version, cb) { + cb = cb || noop; + + if (this.items.hasOwnProperty(key)) { + var old = this.items[key]; + if (old && old.version < version) { + old.deleted = true; + old.version = version; + this.items[key] = old; + } + } else { + this.items[key] = old; + } + + + cb(); + } + + store.upsert = function(key, item, cb) { + cb = cb || noop; + var old = this.items[key]; + + if (this.items.hasOwnProperty(key)) { + var old = this.items[key]; + if (old && old.version < item.version) { + this.items[key] = item; + } + } else { + this.items[key] = item; + } + + cb(); + } + + store.initialized = function() { + return this.init_called === true; + } + + store.close = function() { + // Close on the in-memory store is a no-op + } + + return store; +} + +// Deep clone an object. Does not preserve any +// functions on the object +function clone(obj) { + return JSON.parse(JSON.stringify(obj)); +} + +module.exports = InMemoryStore; diff --git a/index.js b/index.js index 1250936..6149538 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,9 @@ var request = require('request'); var FeatureStoreEventWrapper = require('./feature_store_event_wrapper'); var InMemoryFeatureStore = require('./feature_store'); +var InMemorySegmentStore = require('./segment_store'); var RedisFeatureStore = require('./redis_feature_store'); +var RedisSegmentStore = require('./redis_segment_store'); var Requestor = require('./requestor'); var EventEmitter = require('events').EventEmitter; var EventSerializer = require('./event_serializer'); @@ -100,6 +102,8 @@ var new_client = function(sdk_key, config) { var featureStore = config.feature_store || InMemoryFeatureStore(); config.feature_store = FeatureStoreEventWrapper(featureStore, client); + var segmentStore = config.segment_store || InMemorySegmentStore(); + var eventSerializer = EventSerializer(config); var maybeReportError = createErrorReporter(client, config.logger); @@ -191,7 +195,7 @@ var new_client = function(sdk_key, config) { } config.feature_store.get(key, function(flag) { - evaluate.evaluate(flag, user, config.feature_store, function(err, result, events) { + evaluate.evaluate(flag, user, config.feature_store, config.segment_store, function(err, result, events) { var i; var version = flag ? flag.version : null; @@ -238,7 +242,7 @@ var new_client = function(sdk_key, config) { config.feature_store.all(function(flags) { async.forEachOf(flags, function(flag, key, iteratee_cb) { // At the moment, we don't send any events here - evaluate.evaluate(flag, user, config.feature_store, function(err, result, events) { + evaluate.evaluate(flag, user, config.feature_store, config.segment_store, function(err, result, events) { results[key] = result; iteratee_cb(null); }) @@ -362,6 +366,7 @@ var new_client = function(sdk_key, config) { module.exports = { init: new_client, RedisFeatureStore: RedisFeatureStore, + RedisSegmentStore: RedisSegmentStore, errors: errors }; diff --git a/polling.js b/polling.js index a608c4b..d2a3e6d 100644 --- a/polling.js +++ b/polling.js @@ -2,7 +2,8 @@ var errors = require('./errors'); function PollingProcessor(config, requestor) { var processor = {}, - store = config.feature_store, + featureStore = config.feature_store, + segmentStore = config.segment_store, stopped = false; function poll(cb) { @@ -16,7 +17,7 @@ function PollingProcessor(config, requestor) { start_time = new Date().getTime(); config.logger.debug("Polling LaunchDarkly for feature flag updates"); - requestor.request_all_flags(function(err, flags) { + requestor.request_all_data(function(err, allData) { elapsed = new Date().getTime() - start_time; sleepFor = Math.max(config.poll_interval * 1000 - elapsed, 0); config.logger.debug("Elapsed: %d ms, sleeping for %d ms", elapsed, sleepFor); @@ -29,10 +30,12 @@ function PollingProcessor(config, requestor) { setTimeout(function() { poll(cb); }, sleepFor); } } else { - store.init(JSON.parse(flags), function() { - cb(); - // Recursively call poll after the appropriate delay - setTimeout(function() { poll(cb); }, sleepFor); + featureStore.init(JSON.parse(allData.flags), function() { + segmentStore.init(JSON.parse(allData.segments), function() { + cb(); + // Recursively call poll after the appropriate delay + setTimeout(function() { poll(cb); }, sleepFor); + }); }); } }); diff --git a/redis_feature_store.js b/redis_feature_store.js index bf06f80..f63a355 100644 --- a/redis_feature_store.js +++ b/redis_feature_store.js @@ -1,207 +1,5 @@ -var redis = require('redis'), - NodeCache = require( "node-cache" ), - winston = require('winston'); +var RedisStore = require('./redis_store'); - -var noop = function(){}; - - -function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { - var client = redis.createClient(redis_opts), - store = {}, - features_key = prefix ? prefix + ":features" : "launchdarkly:features", - cache = cache_ttl ? new NodeCache({ stdTTL: cache_ttl}) : null, - inited = false, - checked_init = false; - - logger = (logger || - new winston.Logger({ - level: 'info', - transports: [ - new (winston.transports.Console)(), - ] - }) - ); - - // Allow driver programs to exit, even if the Redis - // socket is active - client.unref(); - - // A helper that performs a get with the redis client - function do_get(key, cb) { - var flag; - cb = cb || noop; - - if (cache_ttl) { - flag = cache.get(key); - if (flag) { - cb(flag); - return; - } - } - - client.hget(features_key, key, function(err, obj) { - if (err) { - logger.error("Error fetching flag from redis", err); - cb(null); - } else { - flag = JSON.parse(obj); - cb( (!flag || flag.deleted) ? null : flag); - } - }); - } - - store.get = function(key, cb) { - do_get(key, function(flag) { - if (flag && !flag.deleted) { - cb(flag); - } else { - cb(null); - } - }); - }; - - store.all = function(cb) { - cb = cb || noop; - client.hgetall(features_key, function(err, obj) { - if (err) { - logger.error("Error fetching flag from redis", err); - cb(null); - } else { - var results = {}, - flags = obj; - - for (var key in flags) { - if (Object.hasOwnProperty.call(flags,key)) { - var flag = JSON.parse(flags[key]); - if (!flag.deleted) { - results[key] = flag; - } - } - } - cb(results); - } - }); - }; - - store.init = function(flags, cb) { - var stringified = {}; - var multi = client.multi(); - cb = cb || noop; - - multi.del(features_key); - if (cache_ttl) { - cache.flushAll(); - } - - - for (var key in flags) { - if (Object.hasOwnProperty.call(flags,key)) { - stringified[key] = JSON.stringify(flags[key]); - } - if (cache_ttl) { - cache.set(key, flags[key]); - } - } - - multi.hmset(features_key, stringified); - - multi.exec(function(err, replies) { - if (err) { - logger.error("Error initializing redis feature store", err); - } else { - inited = true; - } - cb(); - }); - }; - - store.delete = function(key, version, cb) { - var multi; - cb = cb || noop; - client.watch(features_key); - multi = client.multi(); - - - do_get(key, function(flag) { - if (flag) { - if (flag.version >= version) { - cb(); - return; - } else { - flag.deleted = true; - flag.version = version; - multi.hset(features_key, key, JSON.stringify(flag)); - multi.exec(function(err, replies) { - if (err) { - logger.error("Error deleting feature flag", err); - } else if (cache_ttl) { - cache.set(key, flag); - } - cb(); - }); - } - } - }); - }; - - store.upsert = function(key, flag, cb) { - var multi; - cb = cb || noop; - client.watch(features_key); - multi = client.multi(); - - do_get(key, function(original) { - if (original && original.version >= flag.version) { - cb(); - return; - } - - multi.hset(features_key, key, JSON.stringify(flag)); - multi.exec(function(err, replies) { - if (err) { - logger.error("Error upserting feature flag", err); - } else { - if (cache_ttl) { - cache.set(key, flag); - } - } - cb(); - }); - - }); - }; - - store.initialized = function(cb) { - cb = cb || noop; - if (inited) { - // Once we've determined that we're initialized, we can never become uninitialized again - cb(true); - } - else if (checked_init) { - // We don't want to hit Redis for this question more than once; if we've already checked there - // and it wasn't populated, we'll continue to say we're uninited until init() has been called - cb(false); - } - else { - client.exists(features_key, function(err, obj) { - if (!err && obj) { - inited = true; - } - checked_init = true; - cb(inited); - }); - } - }; - - store.close = function() { - client.quit(); - if (cache_ttl) { - cache.close(); - } - }; - - return store; -} +var RedisFeatureStore = RedisStore("feature", ":features"); module.exports = RedisFeatureStore; \ No newline at end of file diff --git a/redis_segment_store.js b/redis_segment_store.js new file mode 100644 index 0000000..7351d7c --- /dev/null +++ b/redis_segment_store.js @@ -0,0 +1,5 @@ +var RedisStore = require('./redis_store'); + +var RedisSegmentStore = RedisStore("segment", ":segments"); + +module.exports = RedisSegmentStore; \ No newline at end of file diff --git a/redis_store.js b/redis_store.js new file mode 100644 index 0000000..247e9a4 --- /dev/null +++ b/redis_store.js @@ -0,0 +1,210 @@ +var redis = require('redis'), + NodeCache = require( "node-cache" ), + winston = require('winston'); + + +var noop = function(){}; + + +function RedisStore(itemName, baseKey) { + return function(redis_opts, cache_ttl, prefix, logger) { + + var client = redis.createClient(redis_opts), + store = {}, + items_key = (prefix || "launchdarkly") + baseKey, + cache = cache_ttl ? new NodeCache({ stdTTL: cache_ttl}) : null, + inited = false, + checked_init = false; + + logger = (logger || + new winston.Logger({ + level: 'info', + transports: [ + new (winston.transports.Console)(), + ] + }) + ); + + // Allow driver programs to exit, even if the Redis + // socket is active + client.unref(); + + // A helper that performs a get with the redis client + function do_get(key, cb) { + var item; + cb = cb || noop; + + if (cache_ttl) { + item = cache.get(key); + if (item) { + cb(item); + return; + } + } + + client.hget(items_key, key, function(err, obj) { + if (err) { + logger.error("Error fetching " + itemName + " from redis", err); + cb(null); + } else { + item = JSON.parse(obj); + cb( (!item || item.deleted) ? null : item); + } + }); + } + + store.get = function(key, cb) { + do_get(key, function(item) { + if (item && !item.deleted) { + cb(item); + } else { + cb(null); + } + }); + }; + + store.all = function(cb) { + cb = cb || noop; + client.hgetall(items_key, function(err, obj) { + if (err) { + logger.error("Error fetching " + itemName + " from redis", err); + cb(null); + } else { + var results = {}, + items = obj; + + for (var key in items) { + if (Object.hasOwnProperty.call(items,key)) { + var item = JSON.parse(items[key]); + if (!item.deleted) { + results[key] = item; + } + } + } + cb(results); + } + }); + }; + + store.init = function(items, cb) { + var stringified = {}; + var multi = client.multi(); + cb = cb || noop; + + multi.del(items_key); + if (cache_ttl) { + cache.flushAll(); + } + + + for (var key in items) { + if (Object.hasOwnProperty.call(items,key)) { + stringified[key] = JSON.stringify(items[key]); + } + if (cache_ttl) { + cache.set(key, items[key]); + } + } + + multi.hmset(items_key, stringified); + + multi.exec(function(err, replies) { + if (err) { + logger.error("Error initializing redis " + itemName + " store", err); + } else { + inited = true; + } + cb(); + }); + }; + + store.delete = function(key, version, cb) { + var multi; + cb = cb || noop; + client.watch(items_key); + multi = client.multi(); + + + do_get(key, function(item) { + if (item) { + if (item.version >= version) { + cb(); + return; + } else { + item.deleted = true; + item.version = version; + multi.hset(items_key, key, JSON.stringify(item)); + multi.exec(function(err, replies) { + if (err) { + logger.error("Error deleting " + itemName, err); + } else if (cache_ttl) { + cache.set(key, item); + } + cb(); + }); + } + } + }); + }; + + store.upsert = function(key, item, cb) { + var multi; + cb = cb || noop; + client.watch(items_key); + multi = client.multi(); + + do_get(key, function(original) { + if (original && original.version >= item.version) { + cb(); + return; + } + + multi.hset(items_key, key, JSON.stringify(item)); + multi.exec(function(err, replies) { + if (err) { + logger.error("Error upserting " + itemName, err); + } else { + if (cache_ttl) { + cache.set(key, item); + } + } + cb(); + }); + + }); + }; + + store.initialized = function(cb) { + cb = cb || noop; + if (inited) { + // Once we've determined that we're initialized, we can never become uninitialized again + cb(true); + } + else if (checked_init) { + // We don't want to hit Redis for this question more than once; if we've already checked there + // and it wasn't populated, we'll continue to say we're uninited until init() has been called + cb(false); + } + else { + client.exists(items_key, function(err, obj) { + if (!err && obj) { + inited = true; + } + checked_init = true; + cb(inited); + }); + } + }; + + store.close = function() { + client.quit(); + if (cache_ttl) { + cache.close(); + } + }; + + return store; + }; +} + +module.exports = RedisStore; \ No newline at end of file diff --git a/requestor.js b/requestor.js index ef46235..f0c471d 100644 --- a/requestor.js +++ b/requestor.js @@ -1,6 +1,6 @@ var ETagRequest = require('request-etag'); /** - * Creates a new Requestor object, which handles remote requests to fetch feature flags for LaunchDarkly. + * Creates a new Requestor object, which handles remote requests to fetch feature flags or segments for LaunchDarkly. * This is never called synchronously when requesting a feature flag for a user (e.g. via the toggle) call. * * It will be called once per second in polling mode (i.e. when streaming is disabled), or for extremely large @@ -81,6 +81,30 @@ function Requestor(sdk_key, config) { ); } + requestor.request_segment = function(key, cb) { + var req = make_request('/sdk/latest-segments/' + key); + req( + process_response(cb), + process_error_response(cb) + ); + } + + requestor.request_all_segments = function(cb) { + var req = make_request('/sdk/latest-segments'); + req( + process_response(cb), + process_error_response(cb) + ); + } + + requestor.request_all_data = function(cb) { + var req = make_request('/sdk/latest-all'); + req( + process_response(cb), + process_error_response(cb) + ); + } + return requestor; } diff --git a/segment_store.js b/segment_store.js new file mode 100644 index 0000000..d9e214a --- /dev/null +++ b/segment_store.js @@ -0,0 +1,3 @@ +var InMemoryStore = require('./in_memory_store'); +// There's no difference in implementation between any of the in-memory stores. +module.exports = InMemoryStore; diff --git a/streaming.js b/streaming.js index e2eb502..57df901 100644 --- a/streaming.js +++ b/streaming.js @@ -4,12 +4,25 @@ var EventSource = require('./eventsource'); function StreamProcessor(sdk_key, config, requestor) { var processor = {}, - store = config.feature_store, + featureStore = config.feature_store, + segmentStore = config.segment_store, es; + function getFlagKeyFromPath(path) { + return getKeyFromPath(path, '/flags/'); + } + + function getSegmentKeyFromPath(path) { + return getKeyFromPath(path, '/segments/'); + } + + function getKeyFromPath(path, prefix) { + return path.startsWith(prefix) ? path.substring(prefix.length) : null; + } + processor.start = function(fn) { var cb = fn || function(){}; - es = new EventSource(config.stream_uri + "/flags", + es = new EventSource(config.stream_uri + "/all", { agent: config.proxy_agent, headers: {'Authorization': sdk_key,'User-Agent': config.user_agent} @@ -22,9 +35,11 @@ function StreamProcessor(sdk_key, config, requestor) { es.addEventListener('put', function(e) { config.logger.debug('Received put event'); if (e && e.data) { - var flags = JSON.parse(e.data); - store.init(flags, function() { - cb(); + var all = JSON.parse(e.data); + featureStore.init(all.flags, function() { + segmentStore.init(all.flags, function() { + cb(); + }); }) } else { cb(new errors.LDStreamingError('Unexpected payload from event stream')); @@ -34,8 +49,16 @@ function StreamProcessor(sdk_key, config, requestor) { es.addEventListener('patch', function(e) { config.logger.debug('Received patch event'); if (e && e.data) { - var patch = JSON.parse(e.data); - store.upsert(patch.data.key, patch.data); + var patch = JSON.parse(e.data), + key = getFlagKeyFromPath(patch.path); + if (key != null) { + featureStore.upsert(key, patch.data); + } else { + key = getSegmentKeyFromPath(patch.path); + if (key != null) { + segmentStore.upsert(key, patch.data); + } + } } else { cb(new errors.LDStreamingError('Unexpected payload from event stream')); } @@ -45,10 +68,16 @@ function StreamProcessor(sdk_key, config, requestor) { config.logger.debug('Received delete event'); if (e && e.data) { var data = JSON.parse(e.data), - key = data.path.charAt(0) === '/' ? data.path.substring(1) : data.path, // trim leading '/' - version = data.version; - - store.delete(key, version); + version = data.version, + key = getFlagKeyFromPath(data.path); + if (key != null) { + featureStore.delete(key, version); + } else { + key = getSegmentKeyFromPath(patch.path); + if (key != null) { + segmentStore.delete(key, version); + } + } } else { cb(new errors.LDStreamingError('Unexpected payload from event stream')); } @@ -60,8 +89,16 @@ function StreamProcessor(sdk_key, config, requestor) { if (err) { cb(err); } else { - store.init(JSON.parse(flags), function() { - cb(); + featureStore.init(JSON.parse(flags), function() { + requestor.request_all_segments(function(err, segments) { + if (err) { + cb(err); + } else { + segmentStore.init(JSON.parse(segments), function() { + cb(); + }) + } + }); }) } }) @@ -70,14 +107,28 @@ function StreamProcessor(sdk_key, config, requestor) { es.addEventListener('indirect/patch', function(e) { config.logger.debug('Received indirect patch event') if (e && e.data) { - var key = e.data.charAt(0) === '/' ? e.data.substring(1) : e.data; - requestor.request_flag(key, function(err, flag) { - if (err) { - cb(new errors.LDStreamingError('Unexpected error requesting feature flag')); - } else { - store.upsert(key, JSON.parse(flag)); + var path = e.data, + key = getFlagKeyFromPath(path); + if (key != null) { + requestor.request_flag(key, function(err, flag) { + if (err) { + cb(new errors.LDStreamingError('Unexpected error requesting feature flag')); + } else { + featureStore.upsert(key, JSON.parse(flag)); + } + }); + } else { + key = getSegmentKeyFromPath(path); + if (key != null) { + requestor.request_segment(key, function(err, segment) { + if (err) { + cb(new errors.LDStreamingError('Unexpected error requesting segment')); + } else { + segmentStore.upsert(key, JSON.parse(segment)); + } + }); } - }) + } } else { cb(new errors.LDStreamingError('Unexpected payload from event stream')); } From 2957f7643a6ec890d8ad3974cc06c02b556a77f9 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 14:25:37 -0800 Subject: [PATCH 02/21] genericize feature store --- evaluate_flag.js | 27 ++--- feature_store.js | 110 ++++++++++++++++++++- in_memory_store.js | 98 ------------------ index.js | 14 +-- redis_feature_store.js | 220 ++++++++++++++++++++++++++++++++++++++++- redis_segment_store.js | 5 - versioned_data_kind.js | 20 ++++ 7 files changed, 364 insertions(+), 130 deletions(-) delete mode 100644 in_memory_store.js delete mode 100644 redis_segment_store.js create mode 100644 versioned_data_kind.js diff --git a/evaluate_flag.js b/evaluate_flag.js index 1553647..06675de 100644 --- a/evaluate_flag.js +++ b/evaluate_flag.js @@ -2,12 +2,13 @@ var operators = require('./operators'); var util = require('util'); var sha1 = require('node-sha1'); var async = require('async'); +var dataKind = require('versioned_data_kind'); var builtins = ['key', 'ip', 'country', 'email', 'firstName', 'lastName', 'avatar', 'name', 'anonymous']; var noop = function(){}; -function evaluate(flag, user, featureStore, segmentStore, cb) { +function evaluate(flag, user, featureStore, cb) { cb = cb || noop; if (!user || user.key === null || user.key === undefined) { cb(null, null, null); @@ -31,7 +32,7 @@ function evaluate(flag, user, featureStore, segmentStore, cb) { return; } - eval_internal(flag, user, featureStore, segmentStore, [], function(err, result, events) { + eval_internal(flag, user, featureStore, [], function(err, result, events) { if (err) { cb(err, result, events); return; @@ -53,19 +54,19 @@ function evaluate(flag, user, featureStore, segmentStore, cb) { return; } -function eval_internal(flag, user, featureStore, segmentStore, events, cb) { +function eval_internal(flag, user, featureStore, events, cb) { // Evaluate prerequisites, if any if (flag.prerequisites) { async.mapSeries(flag.prerequisites, function(prereq, callback) { - featureStore.get(prereq.key, function(f) { + featureStore.get(dataKind.features, prereq.key, function(f) { // If the flag does not exist in the store or is not on, the prerequisite // is not satisfied if (!f || !f.on) { callback(new Error("Unsatisfied prerequisite"), null); return; } - eval_internal(f, user, featureStore, segmentStore, events, function(err, value) { + eval_internal(f, user, featureStore, events, function(err, value) { // If there was an error, the value is null, the variation index is out of range, // or the value does not match the indexed variation the prerequisite is not satisfied var variation = get_variation(f, prereq.variation); @@ -86,18 +87,18 @@ function eval_internal(flag, user, featureStore, segmentStore, events, cb) { cb(null, null, events); return; } - evalRules(flag, user, segmentStore, function(e, variation) { + evalRules(flag, user, featureStore, function(e, variation) { cb(e, variation, events); }); }) } else { - evalRules(flag, user, segmentStore, function(e, variation) { + evalRules(flag, user, featureStore, function(e, variation) { cb(e, variation, events); }); } } -function evalRules(flag, user, segmentStore, cb) { +function evalRules(flag, user, featureStore, cb) { var i, j; var target; var variation; @@ -121,7 +122,7 @@ function evalRules(flag, user, segmentStore, cb) { async.mapSeries(flag.rules, function(rule, callback) { - rule_match_user(rule, user, segmentStore, function(matched) { + rule_match_user(rule, user, featureStore, function(matched) { callback(matched ? rule : null, null); }); }, @@ -141,7 +142,7 @@ function evalRules(flag, user, segmentStore, cb) { ); } -function rule_match_user(r, user, segmentStore, cb) { +function rule_match_user(r, user, featureStore, cb) { var i; if (!r.clauses) { @@ -151,7 +152,7 @@ function rule_match_user(r, user, segmentStore, cb) { // A rule matches if all its clauses match async.mapSeries(r.clauses, function(clause, callback) { - clause_match_user(clause, user, segmentStore, function(matched) { + clause_match_user(clause, user, featureStore, function(matched) { // on the first clause that does *not* match, we raise an "error" to stop the loop callback(matched ? null : clause, null); }); @@ -162,11 +163,11 @@ function rule_match_user(r, user, segmentStore, cb) { ); } -function clause_match_user(c, user, segmentStore, cb) { +function clause_match_user(c, user, featureStore, cb) { if (c.op == 'segmentMatch') { async.mapSeries(r.values, function(value, callback) { - segmentStore.get(value, function(segment) { + featureStore.get(dataKind.segments, value, function(segment) { if (segment && segment_match_user(segment, user)) { // on the first segment that matches, we raise an "error" to stop the loop callback(segment, null); diff --git a/feature_store.js b/feature_store.js index d9e214a..13a9ddb 100644 --- a/feature_store.js +++ b/feature_store.js @@ -1,3 +1,107 @@ -var InMemoryStore = require('./in_memory_store'); -// There's no difference in implementation between any of the in-memory stores. -module.exports = InMemoryStore; +// An in-memory store with an async interface. +// It's async as other implementations (e.g. the RedisFeatureStore) +// may be async, and we want to retain interface compatibility. +var noop = function(){}; +function InMemoryFeatureStore() { + var store = {allData:{}}; + + store.get = function(kind, key, cb) { + cb = cb || noop; + var items = this.allData[kind] || {}; + if items.hasOwnProperty(key) { + var item = items[key]; + + if (!item || item.deleted) { + cb(null); + } else { + cb(clone(item)); + } + } else { + cb (null); + } + } + + store.all = function(kind, cb) { + cb = cb || noop; + var results = {}; + var items = this.allData[kind] || {}; + + for (var key in items) { + if (items.hasOwnProperty(key)) { + var item = items[key]; + if (item && !item.deleted) { + results[key] = clone(item); + } + } + } + + cb(results); + } + + store.init = function(allData, cb) { + cb = cb || noop; + this.items = allData; + this.init_called = true; + cb(); + } + + store.delete = function(kind, key, version, cb) { + cb = cb || noop; + var items = this.allData[kind]; + if (!items) { + items = {}; + this.allData[kind] = items; + } + var deletedItem = { version: version, deleted: true }; + if (items.hasOwnProperty(key)) { + var old = items[key]; + if (!old || old.version < version) { + items[key] = deletedItem; + } + } else { + items[key] = deletedItem; + } + + + cb(); + } + + store.upsert = function(kind, item, cb) { + cb = cb || noop; + var key = item.key; + var items = this.allData[kind]; + if (!items) { + items = {}; + this.allData[kind] = items; + } + + if (items.hasOwnProperty(key)) { + var old = items[key]; + if (old && old.version < item.version) { + items[key] = item; + } + } else { + items[key] = item; + } + + cb(); + } + + store.initialized = function() { + return this.init_called === true; + } + + store.close = function() { + // Close on the in-memory store is a no-op + } + + return store; +} + +// Deep clone an object. Does not preserve any +// functions on the object +function clone(obj) { + return JSON.parse(JSON.stringify(obj)); +} + +module.exports = InMemoryFeatureStore; diff --git a/in_memory_store.js b/in_memory_store.js deleted file mode 100644 index 949389a..0000000 --- a/in_memory_store.js +++ /dev/null @@ -1,98 +0,0 @@ -// An in-memory store with an async interface. -// It's async as other implementations (e.g. the RedisFeatureStore) -// may be async, and we want to retain interface compatibility. -var noop = function(){}; -function InMemoryStore() { - var store = {items:{}}; - - store.get = function(key, cb) { - cb = cb || noop; - - if (this.items.hasOwnProperty(key)) { - var item = this.items[key]; - - if (!item || item.deleted) { - cb(null); - } else { - cb(clone(item)); - } - } else { - cb(null); - } - } - - store.all = function(cb) { - cb = cb || noop; - var results = {}; - - for (var key in this.items) { - if (this.items.hasOwnProperty(key)) { - var item = this.items[key]; - if (item && !item.deleted) { - results[key] = clone(item); - } - } - } - - cb(results); - } - - store.init = function(items, cb) { - cb = cb || noop; - this.items = items; - this.init_called = true; - cb(); - } - - store.delete = function(key, version, cb) { - cb = cb || noop; - - if (this.items.hasOwnProperty(key)) { - var old = this.items[key]; - if (old && old.version < version) { - old.deleted = true; - old.version = version; - this.items[key] = old; - } - } else { - this.items[key] = old; - } - - - cb(); - } - - store.upsert = function(key, item, cb) { - cb = cb || noop; - var old = this.items[key]; - - if (this.items.hasOwnProperty(key)) { - var old = this.items[key]; - if (old && old.version < item.version) { - this.items[key] = item; - } - } else { - this.items[key] = item; - } - - cb(); - } - - store.initialized = function() { - return this.init_called === true; - } - - store.close = function() { - // Close on the in-memory store is a no-op - } - - return store; -} - -// Deep clone an object. Does not preserve any -// functions on the object -function clone(obj) { - return JSON.parse(JSON.stringify(obj)); -} - -module.exports = InMemoryStore; diff --git a/index.js b/index.js index 390f894..39c8a9b 100644 --- a/index.js +++ b/index.js @@ -1,9 +1,7 @@ var request = require('request'); var FeatureStoreEventWrapper = require('./feature_store_event_wrapper'); var InMemoryFeatureStore = require('./feature_store'); -var InMemorySegmentStore = require('./segment_store'); var RedisFeatureStore = require('./redis_feature_store'); -var RedisSegmentStore = require('./redis_segment_store'); var Requestor = require('./requestor'); var EventEmitter = require('events').EventEmitter; var EventSerializer = require('./event_serializer'); @@ -17,6 +15,7 @@ var async = require('async'); var errors = require('./errors'); var package_json = require('./package.json'); var wrapPromiseCallback = require('./utils/wrapPromiseCallback'); +var dataKind = require('versioned_data_kind'); function createErrorReporter(emitter, logger) { return function(error) { @@ -75,8 +74,6 @@ var new_client = function(sdk_key, config) { var featureStore = config.feature_store || InMemoryFeatureStore(); config.feature_store = FeatureStoreEventWrapper(featureStore, client); - var segmentStore = config.segment_store || InMemorySegmentStore(); - var eventSerializer = EventSerializer(config); var maybeReportError = createErrorReporter(client, config.logger); @@ -167,8 +164,8 @@ var new_client = function(sdk_key, config) { } } - config.feature_store.get(key, function(flag) { - evaluate.evaluate(flag, user, config.feature_store, config.segment_store, function(err, result, events) { + config.feature_store.get(dataKind.features, key, function(flag) { + evaluate.evaluate(flag, user, config.feature_store, function(err, result, events) { var i; var version = flag ? flag.version : null; @@ -212,10 +209,10 @@ var new_client = function(sdk_key, config) { return resolve({}); } - config.feature_store.all(function(flags) { + config.feature_store.all(dataKind.features, function(flags) { async.forEachOf(flags, function(flag, key, iteratee_cb) { // At the moment, we don't send any events here - evaluate.evaluate(flag, user, config.feature_store, config.segment_store, function(err, result, events) { + evaluate.evaluate(flag, user, config.feature_store, function(err, result, events) { results[key] = result; iteratee_cb(null); }) @@ -339,7 +336,6 @@ var new_client = function(sdk_key, config) { module.exports = { init: new_client, RedisFeatureStore: RedisFeatureStore, - RedisSegmentStore: RedisSegmentStore, errors: errors }; diff --git a/redis_feature_store.js b/redis_feature_store.js index f63a355..63ded66 100644 --- a/redis_feature_store.js +++ b/redis_feature_store.js @@ -1,5 +1,221 @@ -var RedisStore = require('./redis_store'); +var redis = require('redis'), + NodeCache = require( "node-cache" ), + winston = require('winston'), + dataKind = require('versioned_data_kind'); -var RedisFeatureStore = RedisStore("feature", ":features"); + +var noop = function(){}; + + +function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { + + var client = redis.createClient(redis_opts), + store = {}, + items_prefix = (prefix || "launchdarkly") + ":", + cache = cache_ttl ? new NodeCache({ stdTTL: cache_ttl}) : null, + inited = false, + checked_init = false; + + logger = (logger || + new winston.Logger({ + level: 'info', + transports: [ + new (winston.transports.Console)(), + ] + }) + ); + + // Allow driver programs to exit, even if the Redis + // socket is active + client.unref(); + + function items_key(kind) { + return items_prefix + kind.namespace; + } + + function cache_key(kind, key) { + return kind.namespace + ":" + key; + } + + // A helper that performs a get with the redis client + function do_get(kind, key, cb) { + var item; + cb = cb || noop; + + if (cache_ttl) { + item = cache.get(cache_key(kind, key)); + if (item) { + cb(item); + return; + } + } + + client.hget(items_key(kind), key, function(err, obj) { + if (err) { + logger.error("Error fetching key " + key + " from redis in '" + kind.namespace + "'", err); + cb(null); + } else { + item = JSON.parse(obj); + cb(item); + } + }); + } + + store.get = function(kind, key, cb) { + do_get(kind, key, function(item) { + if (item && !item.deleted) { + cb(item); + } else { + cb(null); + } + }); + }; + + store.all = function(kind, cb) { + cb = cb || noop; + client.hgetall(items_key(kind), function(err, obj) { + if (err) { + logger.error("Error fetching '" + kind.namespace + "'' from redis", err); + cb(null); + } else { + var results = {}, + items = obj; + + for (var key in items) { + if (Object.hasOwnProperty.call(items,key)) { + var item = JSON.parse(items[key]); + if (!item.deleted) { + results[key] = item; + } + } + } + cb(results); + } + }); + }; + + store.init = function(allData, cb) { + var multi = client.multi(); + cb = cb || noop; + + if (cache_ttl) { + cache.flushAll(); + } + + for (var kind in allData) { + if (Object.hasOwnProperty.call(allData, kind)) { + var baseKey = items_key(kind); + var items = allData[kind]; + var stringified = {}; + multi.del(baseKey); + for (var key in items) { + if (Object.hasOwnProperty.call(items,key)) { + stringified[key] = JSON.stringify(items[key]); + } + if (cache_ttl) { + cache.set(cache_key(kind, key), items[key]); + } + } + multi.hmset(baseKey, stringified); + } + } + + multi.exec(function(err, replies) { + if (err) { + logger.error("Error initializing redis store", err); + } else { + inited = true; + } + cb(); + }); + }; + + store.delete = function(kind, key, version, cb) { + var multi; + var baseKey = items_key(kind); + cb = cb || noop; + client.watch(baseKey); + multi = client.multi(); + + do_get(kind, key, function(item) { + if (item && item.version >= version) { + cb(); + return; + } else { + deletedItem = { version: version, deleted: true }; + multi.hset(baseKey, key, JSON.stringify(deletedItem)); + multi.exec(function(err, replies) { + if (err) { + logger.error("Error deleting key " + key + " in '" + kind.namespace + "'", err); + } else if (cache_ttl) { + cache.set(cache_key(kind, key), item); + } + cb(); + }); + } + }); + }; + + store.upsert = function(kind, item, cb) { + var multi; + var baseKey = items_key(kind); + var key = item.key; + cb = cb || noop; + client.watch(baseKey); + multi = client.multi(); + + do_get(kind, key, function(original) { + if (original && original.version >= item.version) { + cb(); + return; + } + + multi.hset(baseKey, key, JSON.stringify(item)); + multi.exec(function(err, replies) { + if (err) { + logger.error("Error upserting key " + key + " in '" + kind.namespace + "'", err); + } else { + if (cache_ttl) { + cache.set(cache_key(kind, key), item); + } + } + cb(); + }); + + }); + }; + + store.initialized = function(cb) { + cb = cb || noop; + if (inited) { + // Once we've determined that we're initialized, we can never become uninitialized again + cb(true); + } + else if (checked_init) { + // We don't want to hit Redis for this question more than once; if we've already checked there + // and it wasn't populated, we'll continue to say we're uninited until init() has been called + cb(false); + } + else { + client.exists(items_key(dataKind.features), function(err, obj) { + if (!err && obj) { + inited = true; + } + checked_init = true; + cb(inited); + }); + } + }; + + store.close = function() { + client.quit(); + if (cache_ttl) { + cache.close(); + } + }; + + return store; + }; +} module.exports = RedisFeatureStore; \ No newline at end of file diff --git a/redis_segment_store.js b/redis_segment_store.js deleted file mode 100644 index 7351d7c..0000000 --- a/redis_segment_store.js +++ /dev/null @@ -1,5 +0,0 @@ -var RedisStore = require('./redis_store'); - -var RedisSegmentStore = RedisStore("segment", ":segments"); - -module.exports = RedisSegmentStore; \ No newline at end of file diff --git a/versioned_data_kind.js b/versioned_data_kind.js new file mode 100644 index 0000000..989bf80 --- /dev/null +++ b/versioned_data_kind.js @@ -0,0 +1,20 @@ + +/* + These objects denote the types of data that can be stored in the feature store and + referenced in the API. If we add another storable data type in the future, as long as it + follows the same pattern (having "key", "version", and "deleted" properties), we only need + to add a corresponding constant here and the existing store should be able to handle it. +*/ + +var features = { + namespace: 'features' +}; + +var segments = { + namespace: 'segments' +}; + +module.exports = { + features: features, + segments: segments +}; From 5b71a1acaefa11eca2dc7353beb1ba8d42dcd05c Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 14:32:29 -0800 Subject: [PATCH 03/21] misc fixes --- feature_store.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/feature_store.js b/feature_store.js index 13a9ddb..8eb2b76 100644 --- a/feature_store.js +++ b/feature_store.js @@ -8,7 +8,7 @@ function InMemoryFeatureStore() { store.get = function(kind, key, cb) { cb = cb || noop; var items = this.allData[kind] || {}; - if items.hasOwnProperty(key) { + if (Object.hasOwnProperty.call(items, key)) { var item = items[key]; if (!item || item.deleted) { @@ -27,7 +27,7 @@ function InMemoryFeatureStore() { var items = this.allData[kind] || {}; for (var key in items) { - if (items.hasOwnProperty(key)) { + if (Object.hasOwnProperty.call(items, key)) { var item = items[key]; if (item && !item.deleted) { results[key] = clone(item); @@ -53,7 +53,7 @@ function InMemoryFeatureStore() { this.allData[kind] = items; } var deletedItem = { version: version, deleted: true }; - if (items.hasOwnProperty(key)) { + if (Object.hasOwnProperty.call(items, key)) { var old = items[key]; if (!old || old.version < version) { items[key] = deletedItem; @@ -75,7 +75,7 @@ function InMemoryFeatureStore() { this.allData[kind] = items; } - if (items.hasOwnProperty(key)) { + if (Object.hasOwnProperty.call(items, key)) { var old = items[key]; if (old && old.version < item.version) { items[key] = item; From 997f4ef7b4c603988b69050ecfab154a59f1238f Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 14:35:13 -0800 Subject: [PATCH 04/21] typo --- redis_feature_store.js | 1 - 1 file changed, 1 deletion(-) diff --git a/redis_feature_store.js b/redis_feature_store.js index 63ded66..af878f5 100644 --- a/redis_feature_store.js +++ b/redis_feature_store.js @@ -215,7 +215,6 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { }; return store; - }; } module.exports = RedisFeatureStore; \ No newline at end of file From 134ac1e53fa8dc521430a6063a88d1e608c5b785 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 14:39:47 -0800 Subject: [PATCH 05/21] fix package refs --- evaluate_flag.js | 2 +- index.js | 2 +- redis_feature_store.js | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/evaluate_flag.js b/evaluate_flag.js index 06675de..2599968 100644 --- a/evaluate_flag.js +++ b/evaluate_flag.js @@ -1,8 +1,8 @@ var operators = require('./operators'); +var dataKind = require('./versioned_data_kind'); var util = require('util'); var sha1 = require('node-sha1'); var async = require('async'); -var dataKind = require('versioned_data_kind'); var builtins = ['key', 'ip', 'country', 'email', 'firstName', 'lastName', 'avatar', 'name', 'anonymous']; diff --git a/index.js b/index.js index 39c8a9b..59396e9 100644 --- a/index.js +++ b/index.js @@ -15,7 +15,7 @@ var async = require('async'); var errors = require('./errors'); var package_json = require('./package.json'); var wrapPromiseCallback = require('./utils/wrapPromiseCallback'); -var dataKind = require('versioned_data_kind'); +var dataKind = require('./versioned_data_kind'); function createErrorReporter(emitter, logger) { return function(error) { diff --git a/redis_feature_store.js b/redis_feature_store.js index af878f5..8f73c4e 100644 --- a/redis_feature_store.js +++ b/redis_feature_store.js @@ -1,7 +1,7 @@ var redis = require('redis'), NodeCache = require( "node-cache" ), winston = require('winston'), - dataKind = require('versioned_data_kind'); + dataKind = require('./versioned_data_kind'); var noop = function(){}; From d55f6c7148fc2b0a7faac5855d702ed13beb8f9c Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 15:14:36 -0800 Subject: [PATCH 06/21] misc fixes --- evaluate_flag.js | 6 +++--- feature_store.js | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/evaluate_flag.js b/evaluate_flag.js index 2599968..1351630 100644 --- a/evaluate_flag.js +++ b/evaluate_flag.js @@ -165,7 +165,7 @@ function rule_match_user(r, user, featureStore, cb) { function clause_match_user(c, user, featureStore, cb) { if (c.op == 'segmentMatch') { - async.mapSeries(r.values, + async.mapSeries(c.values, function(value, callback) { featureStore.get(dataKind.segments, value, function(segment) { if (segment && segment_match_user(segment, user)) { @@ -218,7 +218,7 @@ function segment_match_user(segment, user) { return true; } if ((segment.excluded || []).indexOf(user.key) >= 0) { - return true; + return false; } for (var i = 0; i < (segment.rules || []).length; i++) { if (segment_rule_match_user(segment.rules[i], user, segment.key, segment.salt)) { @@ -237,7 +237,7 @@ function segment_rule_match_user(rule, user, segmentKey, salt) { } // If the weight is absent, this rule matches - if (!rule.weight) { + if (rule.weight === undefined || rule.weight === null) { return true; } diff --git a/feature_store.js b/feature_store.js index 8eb2b76..24e340c 100644 --- a/feature_store.js +++ b/feature_store.js @@ -40,7 +40,7 @@ function InMemoryFeatureStore() { store.init = function(allData, cb) { cb = cb || noop; - this.items = allData; + this.allData = allData; this.init_called = true; cb(); } From 64a5b2c7b88d336af17b08b481bb780e29815217 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 15:14:46 -0800 Subject: [PATCH 07/21] unit tests for evaluating segments --- test/evaluate_flag-test.js | 201 +++++++++++++++++++++++++++++++++++++ 1 file changed, 201 insertions(+) create mode 100644 test/evaluate_flag-test.js diff --git a/test/evaluate_flag-test.js b/test/evaluate_flag-test.js new file mode 100644 index 0000000..31e015c --- /dev/null +++ b/test/evaluate_flag-test.js @@ -0,0 +1,201 @@ +var evaluate = require('../evaluate_flag'); +var InMemoryFeatureStore = require('../feature_store'); +var dataKind = require('../versioned_data_kind'); + +var featureStore = new InMemoryFeatureStore(); + +function defineSegment(segment) { + var data = {}; + data[dataKind.segments] = {}; + data[dataKind.segments][segment.key] = segment; + featureStore.init(data); + var result = featureStore.get(dataKind.segments, segment.key); +} + +function evalBooleanFlag(flag, user) { + var gotResult; + evaluate.evaluate(flag, user, featureStore, function(err, result) { + // the in-memory store isn't really async - we can count on receiving this callback before we return. + gotResult = result; + }) + return gotResult; +} + +function makeFlagWithSegmentMatch(segment) { + return { + key: 'flagKey', + version: 1, + on: true, + prerequisites: [], + salt: "", + targets: [], + rules: [ + { + clauses: [ + { + attribute: "", + op: "segmentMatch", + values: [ segment.key ] + } + ], + variation: 1 + } + ], + fallthrough: { + variation: 0 + }, + variations: [ false, true ] + }; +} + +describe('evaluate', function() { + + it('matches segment with explicitly included user', function() { + var segment = { + key: 'test', + included: [ 'foo' ], + version: 1 + }; + defineSegment(segment); + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo' }; + expect(evalBooleanFlag(flag, user)).toBe(true); + }); + + it('does not match segment with explicitly excluded user', function() { + var segment = { + key: 'test', + excluded: [ 'foo' ], + version: 1 + }; + defineSegment(segment); + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo' }; + expect(evalBooleanFlag(flag, user)).toBe(false); + }); + + it('does not match segment with unknown user', function() { + var segment = { + key: 'test', + included: [ 'foo' ], + version: 1 + }; + defineSegment(segment); + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'bar' }; + expect(evalBooleanFlag(flag, user)).toBe(false); + }); + + it('matches segment with user who is both included and excluded', function() { + var segment = { + key: 'test', + included: [ 'foo' ], + excluded: [ 'foo' ], + version: 1 + }; + defineSegment(segment); + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo' }; + expect(evalBooleanFlag(flag, user)).toBe(true); + }); + + it('matches segment with rule with full rollout', function() { + var segment = { + key: 'test', + rules: [ + { + clauses: [ + { + attribute: 'email', + op: 'in', + values: [ 'test@example.com' ] + } + ], + weight: 100000 + } + ], + version: 1 + }; + defineSegment(segment); + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo', email: 'test@example.com' }; + expect(evalBooleanFlag(flag, user)).toBe(true); + }); + + it('does not match segment with rule with zero rollout', function() { + var segment = { + key: 'test', + rules: [ + { + clauses: [ + { + attribute: 'email', + op: 'in', + values: [ 'test@example.com' ] + } + ], + weight: 0 + } + ], + version: 1 + }; + defineSegment(segment); + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo', email: 'test@example.com' }; + expect(evalBooleanFlag(flag, user)).toBe(false); + }); + + it('matches segment with multiple matching clauses', function() { + var segment = { + key: 'test', + rules: [ + { + clauses: [ + { + attribute: 'email', + op: 'in', + values: [ 'test@example.com' ] + }, + { + attribute: 'name', + op: 'in', + values: [ 'bob' ] + } + ] + } + ], + version: 1 + }; + defineSegment(segment); + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo', email: 'test@example.com', name: 'bob' }; + expect(evalBooleanFlag(flag, user)).toBe(true); + }); + + it('does not match segment if one clause does not match', function() { + var segment = { + key: 'test', + rules: [ + { + clauses: [ + { + attribute: 'email', + op: 'in', + values: [ 'test@example.com' ] + }, + { + attribute: 'name', + op: 'in', + values: [ 'bill' ] + } + ] + } + ], + version: 1 + }; + defineSegment(segment); + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo', email: 'test@example.com', name: 'bob' }; + expect(evalBooleanFlag(flag, user)).toBe(false); + }); +}); From 842ce64a6286865ad059fbe33e7e79925e79d775 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 15:23:23 -0800 Subject: [PATCH 08/21] update stream/poll processors for feature store changes --- polling.js | 14 ++++++---- requestor.js | 16 ----------- segment_store.js | 3 -- streaming.js | 63 +++++++++++++++--------------------------- versioned_data_kind.js | 6 ++-- 5 files changed, 35 insertions(+), 67 deletions(-) delete mode 100644 segment_store.js diff --git a/polling.js b/polling.js index d2a3e6d..2f85bd5 100644 --- a/polling.js +++ b/polling.js @@ -1,4 +1,5 @@ var errors = require('./errors'); +var dataKind = require('./versioned_data_kind'); function PollingProcessor(config, requestor) { var processor = {}, @@ -30,12 +31,13 @@ function PollingProcessor(config, requestor) { setTimeout(function() { poll(cb); }, sleepFor); } } else { - featureStore.init(JSON.parse(allData.flags), function() { - segmentStore.init(JSON.parse(allData.segments), function() { - cb(); - // Recursively call poll after the appropriate delay - setTimeout(function() { poll(cb); }, sleepFor); - }); + var initData = {}; + initData[dataKind.features] = allData.flags; + initData[dataKind.segments] = allData.segments; + featureStore.init(initData, function() { + cb(); + // Recursively call poll after the appropriate delay + setTimeout(function() { poll(cb); }, sleepFor); }); } }); diff --git a/requestor.js b/requestor.js index f0c471d..b0bd246 100644 --- a/requestor.js +++ b/requestor.js @@ -73,14 +73,6 @@ function Requestor(sdk_key, config) { ); } - requestor.request_all_flags = function(cb) { - var req = make_request('/sdk/latest-flags'); - req( - process_response(cb), - process_error_response(cb) - ); - } - requestor.request_segment = function(key, cb) { var req = make_request('/sdk/latest-segments/' + key); req( @@ -89,14 +81,6 @@ function Requestor(sdk_key, config) { ); } - requestor.request_all_segments = function(cb) { - var req = make_request('/sdk/latest-segments'); - req( - process_response(cb), - process_error_response(cb) - ); - } - requestor.request_all_data = function(cb) { var req = make_request('/sdk/latest-all'); req( diff --git a/segment_store.js b/segment_store.js deleted file mode 100644 index d9e214a..0000000 --- a/segment_store.js +++ /dev/null @@ -1,3 +0,0 @@ -var InMemoryStore = require('./in_memory_store'); -// There's no difference in implementation between any of the in-memory stores. -module.exports = InMemoryStore; diff --git a/streaming.js b/streaming.js index 57df901..9ecd191 100644 --- a/streaming.js +++ b/streaming.js @@ -5,19 +5,10 @@ var EventSource = require('./eventsource'); function StreamProcessor(sdk_key, config, requestor) { var processor = {}, featureStore = config.feature_store, - segmentStore = config.segment_store, es; - function getFlagKeyFromPath(path) { - return getKeyFromPath(path, '/flags/'); - } - - function getSegmentKeyFromPath(path) { - return getKeyFromPath(path, '/segments/'); - } - - function getKeyFromPath(path, prefix) { - return path.startsWith(prefix) ? path.substring(prefix.length) : null; + function getKeyFromPath(kind, path) { + return path.startsWith(kind.streamApiPath) ? path.substring(kind.streamApiPath.length) : null; } processor.start = function(fn) { @@ -36,11 +27,10 @@ function StreamProcessor(sdk_key, config, requestor) { config.logger.debug('Received put event'); if (e && e.data) { var all = JSON.parse(e.data); - featureStore.init(all.flags, function() { - segmentStore.init(all.flags, function() { - cb(); - }); - }) + var initData = {}; + initData[dataKind.features] = all.flags; + initData[dataKind.segments] = all.segments; + featureStore.init(initData, cb); } else { cb(new errors.LDStreamingError('Unexpected payload from event stream')); } @@ -50,13 +40,13 @@ function StreamProcessor(sdk_key, config, requestor) { config.logger.debug('Received patch event'); if (e && e.data) { var patch = JSON.parse(e.data), - key = getFlagKeyFromPath(patch.path); + key = getKeyFromPath(dataKind.features, patch.path); if (key != null) { - featureStore.upsert(key, patch.data); + featureStore.upsert(dataKind.features, patch.data); } else { - key = getSegmentKeyFromPath(patch.path); + key = getKeyFromPath(dataKind.segments, patch.path); if (key != null) { - segmentStore.upsert(key, patch.data); + featureStore.upsert(dataKind.segments, patch.data); } } } else { @@ -69,13 +59,13 @@ function StreamProcessor(sdk_key, config, requestor) { if (e && e.data) { var data = JSON.parse(e.data), version = data.version, - key = getFlagKeyFromPath(data.path); + key = getKeyFromPath(dataKind.features, data.path); if (key != null) { - featureStore.delete(key, version); + featureStore.delete(dataKind.features, key, version); } else { - key = getSegmentKeyFromPath(patch.path); + key = getKeyFromPath(dataKind.segments, patch.path); if (key != null) { - segmentStore.delete(key, version); + featureStore.delete(dataKind.segments, key, version); } } } else { @@ -85,21 +75,14 @@ function StreamProcessor(sdk_key, config, requestor) { es.addEventListener('indirect/put', function(e) { config.logger.debug('Received indirect put event') - requestor.request_all_flags(function (err, flags) { + requestor.request_all_flags(function (err, all) { if (err) { cb(err); } else { - featureStore.init(JSON.parse(flags), function() { - requestor.request_all_segments(function(err, segments) { - if (err) { - cb(err); - } else { - segmentStore.init(JSON.parse(segments), function() { - cb(); - }) - } - }); - }) + var initData = {}; + initData[dataKind.features] = all.flags; + initData[dataKind.segments] = all.segments; + featureStore.init(initData, cb); } }) }); @@ -108,23 +91,23 @@ function StreamProcessor(sdk_key, config, requestor) { config.logger.debug('Received indirect patch event') if (e && e.data) { var path = e.data, - key = getFlagKeyFromPath(path); + key = getKeyFromPath(dataKind.features, path); if (key != null) { requestor.request_flag(key, function(err, flag) { if (err) { cb(new errors.LDStreamingError('Unexpected error requesting feature flag')); } else { - featureStore.upsert(key, JSON.parse(flag)); + featureStore.upsert(dataKind.features, JSON.parse(flag)); } }); } else { - key = getSegmentKeyFromPath(path); + key = getKeyFromPath(dataKind.segments, path); if (key != null) { requestor.request_segment(key, function(err, segment) { if (err) { cb(new errors.LDStreamingError('Unexpected error requesting segment')); } else { - segmentStore.upsert(key, JSON.parse(segment)); + featureStore.upsert(dataKind.segments, JSON.parse(segment)); } }); } diff --git a/versioned_data_kind.js b/versioned_data_kind.js index 989bf80..c7b1614 100644 --- a/versioned_data_kind.js +++ b/versioned_data_kind.js @@ -7,11 +7,13 @@ */ var features = { - namespace: 'features' + namespace: 'features', + streamApiPath: '/flags/' }; var segments = { - namespace: 'segments' + namespace: 'segments', + streamApiPath: '/segments/' }; module.exports = { From 175b961f2703e3a763b0bb699602d3446fefe4a3 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 15:56:19 -0800 Subject: [PATCH 09/21] more fixes to stream and store wrapper --- feature_store_event_wrapper.js | 29 ++++++++++++++++++----------- streaming.js | 13 +++++++++---- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/feature_store_event_wrapper.js b/feature_store_event_wrapper.js index d33f88f..7a84b05 100644 --- a/feature_store_event_wrapper.js +++ b/feature_store_event_wrapper.js @@ -1,3 +1,5 @@ +var dataKind = require('./versioned_data_kind'); + function FeatureStoreEventWrapper(featureStore, emitter) { function differ(key, oldValue, newValue) { if(newValue && oldValue && newValue.version < oldValue.version) return; @@ -13,10 +15,11 @@ function FeatureStoreEventWrapper(featureStore, emitter) { initialized: featureStore.initialized.bind(featureStore), close: featureStore.close.bind(featureStore), - init: function(newFlags, callback) { - featureStore.all(function(oldFlags){ - featureStore.init(newFlags, function(){ + init: function(newData, callback) { + featureStore.all(dataKind, function(oldFlags){ + featureStore.init(newData, function(){ var allFlags = {}; + var newFlags = newData[dataKind.features] || {}; Object.assign(allFlags, oldFlags, newFlags); var handledFlags = {}; @@ -31,19 +34,23 @@ function FeatureStoreEventWrapper(featureStore, emitter) { }); }, - delete: function(key, version, callback) { - featureStore.get(key, function(oldFlag) { - featureStore.delete(key, version, function() { - differ(key, oldFlag, {}); + delete: function(kind, key, version, callback) { + featureStore.get(kind, key, function(oldFlag) { + featureStore.delete(kind, key, version, function() { + if (kind === dataKind.features) { + differ(key, oldFlag, {}); + } callback && callback.apply(null, arguments); }); }); }, - upsert: function(key, newFlag, callback) { - featureStore.get(key, function(oldFlag) { - featureStore.upsert(key, newFlag, function() { - differ(key, oldFlag, newFlag); + upsert: function(kind, newFlag, callback) { + featureStore.get(newFlag.key, function(oldFlag) { + featureStore.upsert(kind, key, newFlag, function() { + if (kind === dataKind.features) { + differ(key, oldFlag, newFlag); + } callback && callback.apply(null, arguments); }); }); diff --git a/streaming.js b/streaming.js index 9ecd191..5db4b3a 100644 --- a/streaming.js +++ b/streaming.js @@ -1,6 +1,7 @@ var errors = require('./errors'); var EventSource = require('./eventsource'); +var dataKind = require('./versioned_data_kind'); function StreamProcessor(sdk_key, config, requestor) { var processor = {}, @@ -28,9 +29,11 @@ function StreamProcessor(sdk_key, config, requestor) { if (e && e.data) { var all = JSON.parse(e.data); var initData = {}; - initData[dataKind.features] = all.flags; - initData[dataKind.segments] = all.segments; - featureStore.init(initData, cb); + initData[dataKind.features] = all.data.flags; + initData[dataKind.segments] = all.data.segments; + featureStore.init(initData, function() { + cb(); + }); } else { cb(new errors.LDStreamingError('Unexpected payload from event stream')); } @@ -82,7 +85,9 @@ function StreamProcessor(sdk_key, config, requestor) { var initData = {}; initData[dataKind.features] = all.flags; initData[dataKind.segments] = all.segments; - featureStore.init(initData, cb); + featureStore.init(initData, function() { + cb(); + }); } }) }); From 77333d3064bb1623e8b1a580554f14dcd26a76b0 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 16:02:31 -0800 Subject: [PATCH 10/21] can't use object as object key --- feature_store.js | 8 +++++--- polling.js | 4 ++-- redis_feature_store.js | 3 ++- streaming.js | 8 ++++---- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/feature_store.js b/feature_store.js index 24e340c..5145882 100644 --- a/feature_store.js +++ b/feature_store.js @@ -1,3 +1,5 @@ +var dataKind = require('./versioned_data_kind'); + // An in-memory store with an async interface. // It's async as other implementations (e.g. the RedisFeatureStore) // may be async, and we want to retain interface compatibility. @@ -7,7 +9,7 @@ function InMemoryFeatureStore() { store.get = function(kind, key, cb) { cb = cb || noop; - var items = this.allData[kind] || {}; + var items = this.allData[kind.namespace] || {}; if (Object.hasOwnProperty.call(items, key)) { var item = items[key]; @@ -24,7 +26,7 @@ function InMemoryFeatureStore() { store.all = function(kind, cb) { cb = cb || noop; var results = {}; - var items = this.allData[kind] || {}; + var items = this.allData[kind.namespace] || {}; for (var key in items) { if (Object.hasOwnProperty.call(items, key)) { @@ -47,7 +49,7 @@ function InMemoryFeatureStore() { store.delete = function(kind, key, version, cb) { cb = cb || noop; - var items = this.allData[kind]; + var items = this.allData[kind.namespace]; if (!items) { items = {}; this.allData[kind] = items; diff --git a/polling.js b/polling.js index 2f85bd5..b2113d3 100644 --- a/polling.js +++ b/polling.js @@ -32,8 +32,8 @@ function PollingProcessor(config, requestor) { } } else { var initData = {}; - initData[dataKind.features] = allData.flags; - initData[dataKind.segments] = allData.segments; + initData[dataKind.features.namespace] = allData.flags; + initData[dataKind.segments.namespace] = allData.segments; featureStore.init(initData, function() { cb(); // Recursively call poll after the appropriate delay diff --git a/redis_feature_store.js b/redis_feature_store.js index 8f73c4e..037630c 100644 --- a/redis_feature_store.js +++ b/redis_feature_store.js @@ -102,7 +102,8 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { cache.flushAll(); } - for (var kind in allData) { + for (var kindNamespace in allData) { + var kind = dataKind[kindNamespace]; if (Object.hasOwnProperty.call(allData, kind)) { var baseKey = items_key(kind); var items = allData[kind]; diff --git a/streaming.js b/streaming.js index 5db4b3a..c56a16a 100644 --- a/streaming.js +++ b/streaming.js @@ -29,8 +29,8 @@ function StreamProcessor(sdk_key, config, requestor) { if (e && e.data) { var all = JSON.parse(e.data); var initData = {}; - initData[dataKind.features] = all.data.flags; - initData[dataKind.segments] = all.data.segments; + initData[dataKind.features.namespace] = all.data.flags; + initData[dataKind.segments.namespace] = all.data.segments; featureStore.init(initData, function() { cb(); }); @@ -83,8 +83,8 @@ function StreamProcessor(sdk_key, config, requestor) { cb(err); } else { var initData = {}; - initData[dataKind.features] = all.flags; - initData[dataKind.segments] = all.segments; + initData[dataKind.features.namespace] = all.flags; + initData[dataKind.segments.namespace] = all.segments; featureStore.init(initData, function() { cb(); }); From 5da08c5e26ec8dd3a62a5cf0774415542f452f4e Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 16:07:20 -0800 Subject: [PATCH 11/21] misc fixes --- polling.js | 3 ++- streaming.js | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/polling.js b/polling.js index b2113d3..b15ad8c 100644 --- a/polling.js +++ b/polling.js @@ -18,7 +18,7 @@ function PollingProcessor(config, requestor) { start_time = new Date().getTime(); config.logger.debug("Polling LaunchDarkly for feature flag updates"); - requestor.request_all_data(function(err, allData) { + requestor.request_all_data(function(err, resp) { elapsed = new Date().getTime() - start_time; sleepFor = Math.max(config.poll_interval * 1000 - elapsed, 0); config.logger.debug("Elapsed: %d ms, sleeping for %d ms", elapsed, sleepFor); @@ -31,6 +31,7 @@ function PollingProcessor(config, requestor) { setTimeout(function() { poll(cb); }, sleepFor); } } else { + var allData = JSON.parse(resp); var initData = {}; initData[dataKind.features.namespace] = allData.flags; initData[dataKind.segments.namespace] = allData.segments; diff --git a/streaming.js b/streaming.js index c56a16a..cd1ed7e 100644 --- a/streaming.js +++ b/streaming.js @@ -78,10 +78,11 @@ function StreamProcessor(sdk_key, config, requestor) { es.addEventListener('indirect/put', function(e) { config.logger.debug('Received indirect put event') - requestor.request_all_flags(function (err, all) { + requestor.request_all_flags(function (err, resp) { if (err) { cb(err); } else { + var all = JSON.parse(resp); var initData = {}; initData[dataKind.features.namespace] = all.flags; initData[dataKind.segments.namespace] = all.segments; From cf93ed4241ebf4c8b8a127cc3d4c045d6ae3dbaf Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 16:08:40 -0800 Subject: [PATCH 12/21] fix tests --- test/evaluate_flag-test.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/evaluate_flag-test.js b/test/evaluate_flag-test.js index 31e015c..db13d2c 100644 --- a/test/evaluate_flag-test.js +++ b/test/evaluate_flag-test.js @@ -6,8 +6,8 @@ var featureStore = new InMemoryFeatureStore(); function defineSegment(segment) { var data = {}; - data[dataKind.segments] = {}; - data[dataKind.segments][segment.key] = segment; + data[dataKind.segments.namespace] = {}; + data[dataKind.segments.namespace][segment.key] = segment; featureStore.init(data); var result = featureStore.get(dataKind.segments, segment.key); } From 07c759bb00c4b034f44ce6d2ce9618536b343cfe Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 16:27:38 -0800 Subject: [PATCH 13/21] make callback null-tolerant --- redis_feature_store.js | 1 + 1 file changed, 1 insertion(+) diff --git a/redis_feature_store.js b/redis_feature_store.js index 037630c..21eff56 100644 --- a/redis_feature_store.js +++ b/redis_feature_store.js @@ -62,6 +62,7 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { } store.get = function(kind, key, cb) { + cb = cb || noop; do_get(kind, key, function(item) { if (item && !item.deleted) { cb(item); From 6086bec2e0667df7f5892bcf9993799626ed3286 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 17:00:34 -0800 Subject: [PATCH 14/21] add temp debug logging --- streaming.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/streaming.js b/streaming.js index cd1ed7e..113ef44 100644 --- a/streaming.js +++ b/streaming.js @@ -45,10 +45,12 @@ function StreamProcessor(sdk_key, config, requestor) { var patch = JSON.parse(e.data), key = getKeyFromPath(dataKind.features, patch.path); if (key != null) { + config.logger.debug('Updating flag ' + patch.data.key); featureStore.upsert(dataKind.features, patch.data); } else { key = getKeyFromPath(dataKind.segments, patch.path); if (key != null) { + config.logger.debug('Updating segment ' + patch.data.key); featureStore.upsert(dataKind.segments, patch.data); } } @@ -64,10 +66,12 @@ function StreamProcessor(sdk_key, config, requestor) { version = data.version, key = getKeyFromPath(dataKind.features, data.path); if (key != null) { + config.logger.debug('Deleting flag ' + key); featureStore.delete(dataKind.features, key, version); } else { key = getKeyFromPath(dataKind.segments, patch.path); if (key != null) { + config.logger.debug('Deleting segment ' + key); featureStore.delete(dataKind.segments, key, version); } } From f8687a8bf32989459969c64b00e12a821802a7d8 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 17:42:48 -0800 Subject: [PATCH 15/21] further cleanup of streaming & polling logic --- feature_store.js | 2 +- feature_store_event_wrapper.js | 8 ++--- requestor.js | 14 ++------ streaming.js | 64 ++++++++++++++-------------------- versioned_data_kind.js | 6 ++-- 5 files changed, 39 insertions(+), 55 deletions(-) diff --git a/feature_store.js b/feature_store.js index 5145882..27f0323 100644 --- a/feature_store.js +++ b/feature_store.js @@ -71,7 +71,7 @@ function InMemoryFeatureStore() { store.upsert = function(kind, item, cb) { cb = cb || noop; var key = item.key; - var items = this.allData[kind]; + var items = this.allData[kind.namespace]; if (!items) { items = {}; this.allData[kind] = items; diff --git a/feature_store_event_wrapper.js b/feature_store_event_wrapper.js index 7a84b05..087a021 100644 --- a/feature_store_event_wrapper.js +++ b/feature_store_event_wrapper.js @@ -45,11 +45,11 @@ function FeatureStoreEventWrapper(featureStore, emitter) { }); }, - upsert: function(kind, newFlag, callback) { - featureStore.get(newFlag.key, function(oldFlag) { - featureStore.upsert(kind, key, newFlag, function() { + upsert: function(kind, newItem, callback) { + featureStore.get(kind, newItem.key, function(oldItem) { + featureStore.upsert(kind, newItem, function() { if (kind === dataKind.features) { - differ(key, oldFlag, newFlag); + differ(oldItem ? oldItem.key : null, oldItem, newItem); } callback && callback.apply(null, arguments); }); diff --git a/requestor.js b/requestor.js index b0bd246..107d065 100644 --- a/requestor.js +++ b/requestor.js @@ -65,21 +65,13 @@ function Requestor(sdk_key, config) { } } - requestor.request_flag = function(key, cb) { - var req = make_request('/sdk/latest-flags/' + key); + requestor.request_object = function(kind, key, cb) { + var req = make_request(kind.requestPath + key); req( process_response(cb), process_error_response(cb) ); - } - - requestor.request_segment = function(key, cb) { - var req = make_request('/sdk/latest-segments/' + key); - req( - process_response(cb), - process_error_response(cb) - ); - } + } requestor.request_all_data = function(cb) { var req = make_request('/sdk/latest-all'); diff --git a/streaming.js b/streaming.js index 113ef44..5bf1c84 100644 --- a/streaming.js +++ b/streaming.js @@ -42,16 +42,14 @@ function StreamProcessor(sdk_key, config, requestor) { es.addEventListener('patch', function(e) { config.logger.debug('Received patch event'); if (e && e.data) { - var patch = JSON.parse(e.data), - key = getKeyFromPath(dataKind.features, patch.path); - if (key != null) { - config.logger.debug('Updating flag ' + patch.data.key); - featureStore.upsert(dataKind.features, patch.data); - } else { - key = getKeyFromPath(dataKind.segments, patch.path); + var patch = JSON.parse(e.data); + for (var k in dataKind) { + var kind = dataKind[k]; + var key = getKeyFromPath(kind, patch.path); if (key != null) { - config.logger.debug('Updating segment ' + patch.data.key); - featureStore.upsert(dataKind.segments, patch.data); + config.logger.debug('Updating ' + key + ' in ' + kind.namespace); + featureStore.upsert(kind, patch.data); + break; } } } else { @@ -63,16 +61,14 @@ function StreamProcessor(sdk_key, config, requestor) { config.logger.debug('Received delete event'); if (e && e.data) { var data = JSON.parse(e.data), - version = data.version, - key = getKeyFromPath(dataKind.features, data.path); - if (key != null) { - config.logger.debug('Deleting flag ' + key); - featureStore.delete(dataKind.features, key, version); - } else { - key = getKeyFromPath(dataKind.segments, patch.path); + version = data.version; + for (var k in dataKind) { + var kind = dataKind[k]; + var key = getKeyFromPath(kind, data.path); if (key != null) { - config.logger.debug('Deleting segment ' + key); - featureStore.delete(dataKind.segments, key, version); + config.logger.debug('Deleting ' + key + ' in ' + kind.namespace); + featureStore.delete(kind, key, version); + break; } } } else { @@ -100,26 +96,20 @@ function StreamProcessor(sdk_key, config, requestor) { es.addEventListener('indirect/patch', function(e) { config.logger.debug('Received indirect patch event') if (e && e.data) { - var path = e.data, - key = getKeyFromPath(dataKind.features, path); - if (key != null) { - requestor.request_flag(key, function(err, flag) { - if (err) { - cb(new errors.LDStreamingError('Unexpected error requesting feature flag')); - } else { - featureStore.upsert(dataKind.features, JSON.parse(flag)); - } - }); - } else { - key = getKeyFromPath(dataKind.segments, path); + var path = e.data; + for (var k in dataKind) { + var kind = dataKind[k]; + var key = getKeyFromPath(kind, patch.path); if (key != null) { - requestor.request_segment(key, function(err, segment) { - if (err) { - cb(new errors.LDStreamingError('Unexpected error requesting segment')); - } else { - featureStore.upsert(dataKind.segments, JSON.parse(segment)); - } - }); + requestor.request_object(kind, key, function(err, resp) { + if (err) { + cb(new errors.LDStreamingError('Unexpected error requesting ' + key + ' in ' + kind.namespace)); + } else { + config.logger.debug('Updating ' + key + ' in ' + kind.namespace); + featureStore.upsert(kind, JSON.parse(resp)); + } + }); + break; } } } else { diff --git a/versioned_data_kind.js b/versioned_data_kind.js index c7b1614..68876f4 100644 --- a/versioned_data_kind.js +++ b/versioned_data_kind.js @@ -8,12 +8,14 @@ var features = { namespace: 'features', - streamApiPath: '/flags/' + streamApiPath: '/flags/', + requestPath: '/sdk/latest-flags/' }; var segments = { namespace: 'segments', - streamApiPath: '/segments/' + streamApiPath: '/segments/', + requestPath: '/sdk/latest-segments/' }; module.exports = { From 4c060efad2829d54dcf34d9b998993b5e4b3c873 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 17:53:02 -0800 Subject: [PATCH 16/21] fix indirect patch --- streaming.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming.js b/streaming.js index 5bf1c84..979c9f6 100644 --- a/streaming.js +++ b/streaming.js @@ -99,7 +99,7 @@ function StreamProcessor(sdk_key, config, requestor) { var path = e.data; for (var k in dataKind) { var kind = dataKind[k]; - var key = getKeyFromPath(kind, patch.path); + var key = getKeyFromPath(kind, path); if (key != null) { requestor.request_object(kind, key, function(err, resp) { if (err) { From 14a3fb26824f0c0b9d1029aaf18379110e258f46 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 2 Feb 2018 10:54:52 -0800 Subject: [PATCH 17/21] misc fixes to feature store --- feature_store.js | 5 +- redis_feature_store.js | 397 +++++++++++++++++++++-------------------- redis_store.js | 210 ---------------------- 3 files changed, 202 insertions(+), 410 deletions(-) delete mode 100644 redis_store.js diff --git a/feature_store.js b/feature_store.js index 27f0323..f8b3247 100644 --- a/feature_store.js +++ b/feature_store.js @@ -89,8 +89,9 @@ function InMemoryFeatureStore() { cb(); } - store.initialized = function() { - return this.init_called === true; + store.initialized = function(cb) { + cb = cb || noop; + cb(this.init_called === true); } store.close = function() { diff --git a/redis_feature_store.js b/redis_feature_store.js index 21eff56..03fac3b 100644 --- a/redis_feature_store.js +++ b/redis_feature_store.js @@ -9,214 +9,215 @@ var noop = function(){}; function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { - var client = redis.createClient(redis_opts), - store = {}, - items_prefix = (prefix || "launchdarkly") + ":", - cache = cache_ttl ? new NodeCache({ stdTTL: cache_ttl}) : null, - inited = false, - checked_init = false; - - logger = (logger || - new winston.Logger({ - level: 'info', - transports: [ - new (winston.transports.Console)(), - ] - }) - ); - - // Allow driver programs to exit, even if the Redis - // socket is active - client.unref(); - - function items_key(kind) { - return items_prefix + kind.namespace; - } - - function cache_key(kind, key) { - return kind.namespace + ":" + key; - } - - // A helper that performs a get with the redis client - function do_get(kind, key, cb) { - var item; - cb = cb || noop; - - if (cache_ttl) { - item = cache.get(cache_key(kind, key)); - if (item) { - cb(item); - return; - } - } - - client.hget(items_key(kind), key, function(err, obj) { - if (err) { - logger.error("Error fetching key " + key + " from redis in '" + kind.namespace + "'", err); - cb(null); - } else { - item = JSON.parse(obj); - cb(item); - } - }); - } - - store.get = function(kind, key, cb) { - cb = cb || noop; - do_get(kind, key, function(item) { - if (item && !item.deleted) { - cb(item); - } else { - cb(null); - } - }); - }; - - store.all = function(kind, cb) { - cb = cb || noop; - client.hgetall(items_key(kind), function(err, obj) { - if (err) { - logger.error("Error fetching '" + kind.namespace + "'' from redis", err); - cb(null); - } else { - var results = {}, - items = obj; - - for (var key in items) { - if (Object.hasOwnProperty.call(items,key)) { - var item = JSON.parse(items[key]); - if (!item.deleted) { - results[key] = item; - } - } - } - cb(results); - } - }); - }; - - store.init = function(allData, cb) { - var multi = client.multi(); - cb = cb || noop; - - if (cache_ttl) { - cache.flushAll(); - } - - for (var kindNamespace in allData) { - var kind = dataKind[kindNamespace]; - if (Object.hasOwnProperty.call(allData, kind)) { - var baseKey = items_key(kind); - var items = allData[kind]; - var stringified = {}; - multi.del(baseKey); - for (var key in items) { - if (Object.hasOwnProperty.call(items,key)) { - stringified[key] = JSON.stringify(items[key]); - } - if (cache_ttl) { - cache.set(cache_key(kind, key), items[key]); - } - } - multi.hmset(baseKey, stringified); - } - } - - multi.exec(function(err, replies) { - if (err) { - logger.error("Error initializing redis store", err); - } else { - inited = true; - } - cb(); - }); - }; - - store.delete = function(kind, key, version, cb) { - var multi; - var baseKey = items_key(kind); - cb = cb || noop; - client.watch(baseKey); - multi = client.multi(); - - do_get(kind, key, function(item) { - if (item && item.version >= version) { - cb(); + var client = redis.createClient(redis_opts), + store = {}, + items_prefix = (prefix || "launchdarkly") + ":", + cache = cache_ttl ? new NodeCache({ stdTTL: cache_ttl}) : null, + inited = false, + checked_init = false; + + logger = (logger || + new winston.Logger({ + level: 'info', + transports: [ + new (winston.transports.Console)(), + ] + }) + ); + + // Allow driver programs to exit, even if the Redis + // socket is active + client.unref(); + + function items_key(kind) { + return items_prefix + kind.namespace; + } + + function cache_key(kind, key) { + return kind.namespace + ":" + key; + } + + // A helper that performs a get with the redis client + function do_get(kind, key, cb) { + var item; + cb = cb || noop; + + if (cache_ttl) { + item = cache.get(cache_key(kind, key)); + if (item) { + cb(item); return; + } + } + + client.hget(items_key(kind), key, function(err, obj) { + if (err) { + logger.error("Error fetching key " + key + " from redis in '" + kind.namespace + "'", err); + cb(null); + } else { + item = JSON.parse(obj); + cb(item); + } + }); + } + + store.get = function(kind, key, cb) { + cb = cb || noop; + do_get(kind, key, function(item) { + if (item && !item.deleted) { + cb(item); + } else { + cb(null); + } + }); + }; + + store.all = function(kind, cb) { + cb = cb || noop; + client.hgetall(items_key(kind), function(err, obj) { + if (err) { + logger.error("Error fetching '" + kind.namespace + "'' from redis", err); + cb(null); + } else { + var results = {}, + items = obj; + + for (var key in items) { + if (Object.hasOwnProperty.call(items,key)) { + var item = JSON.parse(items[key]); + if (!item.deleted) { + results[key] = item; + } + } + } + cb(results); + } + }); + }; + + store.init = function(allData, cb) { + var multi = client.multi(); + cb = cb || noop; + + if (cache_ttl) { + cache.flushAll(); + } + + for (var kindNamespace in allData) { + if (Object.hasOwnProperty.call(allData, kindNamespace)) { + var kind = dataKind[kindNamespace]; + var baseKey = items_key(kind); + var items = allData[kindNamespace]; + var stringified = {}; + multi.del(baseKey); + for (var key in items) { + if (Object.hasOwnProperty.call(items, key)) { + stringified[key] = JSON.stringify(items[key]); + } + if (cache_ttl) { + cache.set(cache_key(kind, key), items[key]); + } + } + multi.hmset(baseKey, stringified); + } + } + + multi.exec(function(err, replies) { + if (err) { + logger.error("Error initializing redis store", err); } else { - deletedItem = { version: version, deleted: true }; + inited = true; + } + cb(); + }); + }; + + store.delete = function(kind, key, version, cb) { + var multi; + var baseKey = items_key(kind); + cb = cb || noop; + client.watch(baseKey); + multi = client.multi(); + + do_get(kind, key, function(item) { + if (item && item.version >= version) { + multi.discard(); + cb(); + } else { + deletedItem = { version: version, deleted: true }; multi.hset(baseKey, key, JSON.stringify(deletedItem)); multi.exec(function(err, replies) { if (err) { logger.error("Error deleting key " + key + " in '" + kind.namespace + "'", err); } else if (cache_ttl) { - cache.set(cache_key(kind, key), item); + cache.set(cache_key(kind, key), deletedItem); } cb(); }); - } - }); - }; - - store.upsert = function(kind, item, cb) { - var multi; - var baseKey = items_key(kind); - var key = item.key; - cb = cb || noop; - client.watch(baseKey); - multi = client.multi(); - - do_get(kind, key, function(original) { - if (original && original.version >= item.version) { - cb(); - return; - } - - multi.hset(baseKey, key, JSON.stringify(item)); - multi.exec(function(err, replies) { - if (err) { - logger.error("Error upserting key " + key + " in '" + kind.namespace + "'", err); - } else { - if (cache_ttl) { - cache.set(cache_key(kind, key), item); - } - } - cb(); - }); - - }); - }; - - store.initialized = function(cb) { - cb = cb || noop; - if (inited) { - // Once we've determined that we're initialized, we can never become uninitialized again - cb(true); - } - else if (checked_init) { - // We don't want to hit Redis for this question more than once; if we've already checked there - // and it wasn't populated, we'll continue to say we're uninited until init() has been called - cb(false); - } - else { - client.exists(items_key(dataKind.features), function(err, obj) { - if (!err && obj) { - inited = true; - } - checked_init = true; - cb(inited); - }); - } - }; - - store.close = function() { - client.quit(); - if (cache_ttl) { - cache.close(); - } - }; - - return store; + } + }); + }; + + store.upsert = function(kind, item, cb) { + var multi; + var baseKey = items_key(kind); + var key = item.key; + cb = cb || noop; + client.watch(baseKey); + multi = client.multi(); + + do_get(kind, key, function(original) { + if (original && original.version >= item.version) { + cb(); + return; + } + + multi.hset(baseKey, key, JSON.stringify(item)); + multi.exec(function(err, replies) { + if (err) { + logger.error("Error upserting key " + key + " in '" + kind.namespace + "'", err); + } else { + if (cache_ttl) { + cache.set(cache_key(kind, key), item); + } + } + cb(); + }); + + }); + }; + + store.initialized = function(cb) { + cb = cb || noop; + if (inited) { + // Once we've determined that we're initialized, we can never become uninitialized again + cb(true); + } + else if (checked_init) { + // We don't want to hit Redis for this question more than once; if we've already checked there + // and it wasn't populated, we'll continue to say we're uninited until init() has been called + cb(false); + } + else { + var inited = false; + client.exists(items_key(dataKind.features), function(err, obj) { + if (!err && obj) { + inited = true; + } + checked_init = true; + cb(inited); + }); + } + }; + + store.close = function() { + client.quit(); + if (cache_ttl) { + cache.close(); + } + }; + + return store; } module.exports = RedisFeatureStore; \ No newline at end of file diff --git a/redis_store.js b/redis_store.js deleted file mode 100644 index 247e9a4..0000000 --- a/redis_store.js +++ /dev/null @@ -1,210 +0,0 @@ -var redis = require('redis'), - NodeCache = require( "node-cache" ), - winston = require('winston'); - - -var noop = function(){}; - - -function RedisStore(itemName, baseKey) { - return function(redis_opts, cache_ttl, prefix, logger) { - - var client = redis.createClient(redis_opts), - store = {}, - items_key = (prefix || "launchdarkly") + baseKey, - cache = cache_ttl ? new NodeCache({ stdTTL: cache_ttl}) : null, - inited = false, - checked_init = false; - - logger = (logger || - new winston.Logger({ - level: 'info', - transports: [ - new (winston.transports.Console)(), - ] - }) - ); - - // Allow driver programs to exit, even if the Redis - // socket is active - client.unref(); - - // A helper that performs a get with the redis client - function do_get(key, cb) { - var item; - cb = cb || noop; - - if (cache_ttl) { - item = cache.get(key); - if (item) { - cb(item); - return; - } - } - - client.hget(items_key, key, function(err, obj) { - if (err) { - logger.error("Error fetching " + itemName + " from redis", err); - cb(null); - } else { - item = JSON.parse(obj); - cb( (!item || item.deleted) ? null : item); - } - }); - } - - store.get = function(key, cb) { - do_get(key, function(item) { - if (item && !item.deleted) { - cb(item); - } else { - cb(null); - } - }); - }; - - store.all = function(cb) { - cb = cb || noop; - client.hgetall(items_key, function(err, obj) { - if (err) { - logger.error("Error fetching " + itemName + " from redis", err); - cb(null); - } else { - var results = {}, - items = obj; - - for (var key in items) { - if (Object.hasOwnProperty.call(items,key)) { - var item = JSON.parse(items[key]); - if (!item.deleted) { - results[key] = item; - } - } - } - cb(results); - } - }); - }; - - store.init = function(items, cb) { - var stringified = {}; - var multi = client.multi(); - cb = cb || noop; - - multi.del(items_key); - if (cache_ttl) { - cache.flushAll(); - } - - - for (var key in items) { - if (Object.hasOwnProperty.call(items,key)) { - stringified[key] = JSON.stringify(items[key]); - } - if (cache_ttl) { - cache.set(key, items[key]); - } - } - - multi.hmset(items_key, stringified); - - multi.exec(function(err, replies) { - if (err) { - logger.error("Error initializing redis " + itemName + " store", err); - } else { - inited = true; - } - cb(); - }); - }; - - store.delete = function(key, version, cb) { - var multi; - cb = cb || noop; - client.watch(items_key); - multi = client.multi(); - - - do_get(key, function(item) { - if (item) { - if (item.version >= version) { - cb(); - return; - } else { - item.deleted = true; - item.version = version; - multi.hset(items_key, key, JSON.stringify(item)); - multi.exec(function(err, replies) { - if (err) { - logger.error("Error deleting " + itemName, err); - } else if (cache_ttl) { - cache.set(key, item); - } - cb(); - }); - } - } - }); - }; - - store.upsert = function(key, item, cb) { - var multi; - cb = cb || noop; - client.watch(items_key); - multi = client.multi(); - - do_get(key, function(original) { - if (original && original.version >= item.version) { - cb(); - return; - } - - multi.hset(items_key, key, JSON.stringify(item)); - multi.exec(function(err, replies) { - if (err) { - logger.error("Error upserting " + itemName, err); - } else { - if (cache_ttl) { - cache.set(key, item); - } - } - cb(); - }); - - }); - }; - - store.initialized = function(cb) { - cb = cb || noop; - if (inited) { - // Once we've determined that we're initialized, we can never become uninitialized again - cb(true); - } - else if (checked_init) { - // We don't want to hit Redis for this question more than once; if we've already checked there - // and it wasn't populated, we'll continue to say we're uninited until init() has been called - cb(false); - } - else { - client.exists(items_key, function(err, obj) { - if (!err && obj) { - inited = true; - } - checked_init = true; - cb(inited); - }); - } - }; - - store.close = function() { - client.quit(); - if (cache_ttl) { - cache.close(); - } - }; - - return store; - }; -} - -module.exports = RedisStore; \ No newline at end of file From 2d7fb5c708b7fee25522176cc778be0e1e9d9fc8 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 2 Feb 2018 10:55:04 -0800 Subject: [PATCH 18/21] feature store unit tests --- test/feature_store-test.js | 8 ++ test/feature_store_test_base.js | 147 +++++++++++++++++++++++++++++++ test/redis_feature_store-test.js | 9 ++ 3 files changed, 164 insertions(+) create mode 100644 test/feature_store-test.js create mode 100644 test/feature_store_test_base.js create mode 100644 test/redis_feature_store-test.js diff --git a/test/feature_store-test.js b/test/feature_store-test.js new file mode 100644 index 0000000..4ca19ce --- /dev/null +++ b/test/feature_store-test.js @@ -0,0 +1,8 @@ +var InMemoryFeatureStore = require('../feature_store'); +var allFeatureStoreTests = require('./feature_store_test_base'); + +describe('InMemoryFeatureStore', function() { + allFeatureStoreTests(function() { + return new InMemoryFeatureStore(); + }) +}); diff --git a/test/feature_store_test_base.js b/test/feature_store_test_base.js new file mode 100644 index 0000000..981da51 --- /dev/null +++ b/test/feature_store_test_base.js @@ -0,0 +1,147 @@ +var dataKind = require('../versioned_data_kind'); + +function allFeatureStoreTests(makeStore) { + var feature1 = { + key: 'foo', + version: 10 + }; + var feature2 = { + key: 'bar', + version: 10 + }; + + function initedStore(cb) { + var store = makeStore(); + var initData = {}; + initData[dataKind.features.namespace] = { + 'foo': feature1, + 'bar': feature2 + }; + store.init(initData, function() { + cb(store); + }); + } + + it('is initialized after calling init()', function(done) { + initedStore(function(store) { + store.initialized(function(result) { + expect(result).toBe(true); + done(); + }); + }); + }); + + it('gets existing feature', function(done) { + initedStore(function(store) { + store.get(dataKind.features, feature1.key, function(result) { + expect(result).toEqual(feature1); + done(); + }); + }); + }); + + it('does not get nonexisting feature', function(done) { + initedStore(function(store) { + store.get(dataKind.features, 'biz', function(result) { + expect(result).toBe(null); + done(); + }); + }); + }); + + it('gets all features', function(done) { + initedStore(function(store) { + store.all(dataKind.features, function(result) { + expect(result).toEqual({ + 'foo': feature1, + 'bar': feature2 + }); + done(); + }); + }); + }); + + it('upserts with newer version', function(done) { + var newVer = { key: feature1.key, version: feature1.version + 1 }; + initedStore(function(store) { + store.upsert(dataKind.features, newVer, function(result) { + store.get(dataKind.features, feature1.key, function(result) { + expect(result).toEqual(newVer); + done(); + }); + }); + }); + }); + + it('does not upsert with older version', function(done) { + var oldVer = { key: feature1.key, version: feature1.version - 1 }; + initedStore(function(store) { + store.upsert(dataKind.features, oldVer, function(result) { + store.get(dataKind.features, feature1.key, function(result) { + expect(result).toEqual(feature1); + done(); + }); + }); + }); + }); + + it('upserts new feature', function(done) { + var newFeature = { key: 'biz', version: 99 }; + initedStore(function(store) { + store.upsert(dataKind.features, newFeature, function(result) { + store.get(dataKind.features, newFeature.key, function(result) { + expect(result).toEqual(newFeature); + done(); + }); + }); + }); + }); + + it('deletes with newer version', function(done) { + initedStore(function(store) { + store.delete(dataKind.features, feature1.key, feature1.version + 1, function(result) { + store.get(dataKind.features, feature1.key, function(result) { + expect(result).toBe(null); + done(); + }); + }); + }); + }); + + it('does not delete with older version', function(done) { + initedStore(function(store) { + store.delete(dataKind.features, feature1.key, feature1.version - 1, function(result) { + store.get(dataKind.features, feature1.key, function(result) { + expect(result).not.toBe(null); + done(); + }); + }); + }); + }); + + it('allows deleting unknown feature', function(done) { + initedStore(function(store) { + store.delete(dataKind.features, 'biz', 99, function(result) { + store.get(dataKind.features, 'biz', function(result) { + expect(result).toBe(null); + done(); + }); + }); + }); + }); + + it('does not upsert older version after delete', function(done) { + initedStore(function(store) { + store.delete(dataKind.features, feature1.key, feature1.version + 1, function(result) { + store.upsert(dataKind.features, feature1, function(result) { + store.get(dataKind.features, feature1.key, function(result) { + expect(result).toBe(null); + done(); + }); + }); + }); + }); + }); +} + +module.exports = allFeatureStoreTests; diff --git a/test/redis_feature_store-test.js b/test/redis_feature_store-test.js new file mode 100644 index 0000000..5d16e0e --- /dev/null +++ b/test/redis_feature_store-test.js @@ -0,0 +1,9 @@ +var RedisFeatureStore = require('../redis_feature_store'); +var allFeatureStoreTests = require('./feature_store_test_base'); + +describe('RedisFeatureStore', function() { + allFeatureStoreTests(function() { + redisOpts = { url: 'redis://localhost:6379' }; + return new RedisFeatureStore(redisOpts, 30000); + }) +}); From 10623acce4013a4512deb3ceec7490e5e6df1dd5 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 2 Feb 2018 11:43:45 -0800 Subject: [PATCH 19/21] comment --- versioned_data_kind.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/versioned_data_kind.js b/versioned_data_kind.js index 68876f4..21834ca 100644 --- a/versioned_data_kind.js +++ b/versioned_data_kind.js @@ -4,6 +4,9 @@ referenced in the API. If we add another storable data type in the future, as long as it follows the same pattern (having "key", "version", and "deleted" properties), we only need to add a corresponding constant here and the existing store should be able to handle it. + + Note, for things to work correctly, the "namespace" property must match the key used in + module.exports. */ var features = { From 0bce620579ee7883bc6d9c5668c7e7ca0fa113b6 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 2 Feb 2018 12:32:12 -0800 Subject: [PATCH 20/21] fix tests to be async --- test/evaluate_flag-test.js | 125 +++++++++++++++++++++++-------------- 1 file changed, 77 insertions(+), 48 deletions(-) diff --git a/test/evaluate_flag-test.js b/test/evaluate_flag-test.js index db13d2c..e0f22f0 100644 --- a/test/evaluate_flag-test.js +++ b/test/evaluate_flag-test.js @@ -4,21 +4,18 @@ var dataKind = require('../versioned_data_kind'); var featureStore = new InMemoryFeatureStore(); -function defineSegment(segment) { +function defineSegment(segment, cb) { var data = {}; data[dataKind.segments.namespace] = {}; data[dataKind.segments.namespace][segment.key] = segment; featureStore.init(data); - var result = featureStore.get(dataKind.segments, segment.key); + setTimeout(cb, 0); } -function evalBooleanFlag(flag, user) { - var gotResult; +function evalBooleanFlag(flag, user, cb) { evaluate.evaluate(flag, user, featureStore, function(err, result) { - // the in-memory store isn't really async - we can count on receiving this callback before we return. - gotResult = result; - }) - return gotResult; + cb(result); + }); } function makeFlagWithSegmentMatch(segment) { @@ -50,56 +47,72 @@ function makeFlagWithSegmentMatch(segment) { describe('evaluate', function() { - it('matches segment with explicitly included user', function() { + it('matches segment with explicitly included user', function(done) { var segment = { key: 'test', included: [ 'foo' ], version: 1 }; - defineSegment(segment); - var flag = makeFlagWithSegmentMatch(segment); - var user = { key: 'foo' }; - expect(evalBooleanFlag(flag, user)).toBe(true); + defineSegment(segment, function() { + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo' }; + evaluate.evaluate(flag, user, featureStore, function(err, result) { + expect(result).toBe(true); + done(); + }); + }); }); - it('does not match segment with explicitly excluded user', function() { + it('does not match segment with explicitly excluded user', function(done) { var segment = { key: 'test', excluded: [ 'foo' ], version: 1 }; - defineSegment(segment); - var flag = makeFlagWithSegmentMatch(segment); - var user = { key: 'foo' }; - expect(evalBooleanFlag(flag, user)).toBe(false); + defineSegment(segment, function() { + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo' }; + evaluate.evaluate(flag, user, featureStore, function(err, result) { + expect(result).toBe(false); + done(); + }); + }); }); - it('does not match segment with unknown user', function() { + it('does not match segment with unknown user', function(done) { var segment = { key: 'test', included: [ 'foo' ], version: 1 }; - defineSegment(segment); - var flag = makeFlagWithSegmentMatch(segment); - var user = { key: 'bar' }; - expect(evalBooleanFlag(flag, user)).toBe(false); + defineSegment(segment, function() { + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'bar' }; + evaluate.evaluate(flag, user, featureStore, function(err, result) { + expect(result).toBe(false); + done(); + }); + }); }); - it('matches segment with user who is both included and excluded', function() { + it('matches segment with user who is both included and excluded', function(done) { var segment = { key: 'test', included: [ 'foo' ], excluded: [ 'foo' ], version: 1 }; - defineSegment(segment); - var flag = makeFlagWithSegmentMatch(segment); - var user = { key: 'foo' }; - expect(evalBooleanFlag(flag, user)).toBe(true); + defineSegment(segment, function() { + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo' }; + evaluate.evaluate(flag, user, featureStore, function(err, result) { + expect(result).toBe(true); + done(); + }); + }); }); - it('matches segment with rule with full rollout', function() { + it('matches segment with rule with full rollout', function(done) { var segment = { key: 'test', rules: [ @@ -116,13 +129,17 @@ describe('evaluate', function() { ], version: 1 }; - defineSegment(segment); - var flag = makeFlagWithSegmentMatch(segment); - var user = { key: 'foo', email: 'test@example.com' }; - expect(evalBooleanFlag(flag, user)).toBe(true); + defineSegment(segment, function() { + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo', email: 'test@example.com' }; + evaluate.evaluate(flag, user, featureStore, function(err, result) { + expect(result).toBe(true); + done(); + }); + }); }); - it('does not match segment with rule with zero rollout', function() { + it('does not match segment with rule with zero rollout', function(done) { var segment = { key: 'test', rules: [ @@ -139,13 +156,17 @@ describe('evaluate', function() { ], version: 1 }; - defineSegment(segment); - var flag = makeFlagWithSegmentMatch(segment); - var user = { key: 'foo', email: 'test@example.com' }; - expect(evalBooleanFlag(flag, user)).toBe(false); + defineSegment(segment, function() { + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo', email: 'test@example.com' }; + evaluate.evaluate(flag, user, featureStore, function(err, result) { + expect(result).toBe(false); + done(); + }); + }); }); - it('matches segment with multiple matching clauses', function() { + it('matches segment with multiple matching clauses', function(done) { var segment = { key: 'test', rules: [ @@ -166,13 +187,17 @@ describe('evaluate', function() { ], version: 1 }; - defineSegment(segment); - var flag = makeFlagWithSegmentMatch(segment); - var user = { key: 'foo', email: 'test@example.com', name: 'bob' }; - expect(evalBooleanFlag(flag, user)).toBe(true); + defineSegment(segment, function() { + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo', email: 'test@example.com', name: 'bob' }; + evaluate.evaluate(flag, user, featureStore, function(err, result) { + expect(result).toBe(true); + done(); + }); + }); }); - it('does not match segment if one clause does not match', function() { + it('does not match segment if one clause does not match', function(done) { var segment = { key: 'test', rules: [ @@ -193,9 +218,13 @@ describe('evaluate', function() { ], version: 1 }; - defineSegment(segment); - var flag = makeFlagWithSegmentMatch(segment); - var user = { key: 'foo', email: 'test@example.com', name: 'bob' }; - expect(evalBooleanFlag(flag, user)).toBe(false); + defineSegment(segment, function() { + var flag = makeFlagWithSegmentMatch(segment); + var user = { key: 'foo', email: 'test@example.com', name: 'bob' }; + evaluate.evaluate(flag, user, featureStore, function(err, result) { + expect(result).toBe(false); + done(); + }); + }); }); }); From 543f73d9b705fdf15fe4a79afff60b59a14d0543 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 21 Feb 2018 12:13:00 -0800 Subject: [PATCH 21/21] version 4.0.0 --- CHANGELOG.md | 7 +++++++ package.json | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d52b6a3..29f7a92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,13 @@ All notable changes to the LaunchDarkly Node.js SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). +## [4.0.0] - 2018-02-21 +### Added +- Support for a new LaunchDarkly feature: reusable user segments. + +### Changed +- The feature store interface has been changed to support user segment data as well as feature flags. Existing code that uses `RedisFeatureStore` should work as before, but custom feature store implementations will need to be updated. + ## [3.4.0] - 2018-02-13 ### Added - Adds support for a future LaunchDarkly feature, coming soon: semantic version user attributes. diff --git a/package.json b/package.json index c293b9a..2ac4ca3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ldclient-node", - "version": "3.4.0", + "version": "4.0.0", "description": "LaunchDarkly SDK for Node.js", "main": "index.js", "scripts": {