Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions EmberClient/EmberClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class EmberClient extends EventEmitter {
}
}
catch(e) {
winston.debug(e, root);
winston.error(e, root);
if (this._callback) {
this._callback(e);
}
Expand All @@ -88,7 +88,7 @@ class EmberClient extends EventEmitter {
try {
this._makeRequest();
} catch(e) {
winston.debug(e);
winston.error(e);
if (this._callback != null) {
this._callback(e);
}
Expand Down Expand Up @@ -140,8 +140,8 @@ class EmberClient extends EventEmitter {
if (n == null) {
parent.addChild(node);
n = node;
} else {
n.update(node);
} else if (n.update(node)) {
n.updateSubscribers();
}

const children = node.getChildren();
Expand All @@ -165,7 +165,9 @@ class EmberClient extends EventEmitter {
let element = parent.getElementByPath(node.path);
if (element !== null) {
this.emit("value-change", node);
element.update(node);
if (element.update(node)) {
element.updateSubscribers();
}
}
else {
const path = node.path.split(".");
Expand Down Expand Up @@ -344,17 +346,16 @@ class EmberClient extends EventEmitter {
if (nodeElements != null &&
((qnode.isMatrix() && nodeElements.length === 1 && nodeElements[0].getPath() === requestedPath) ||
(!qnode.isMatrix() && nodeElements.every(el => isDirectSubPathOf(el.getPath(), requestedPath))))) {
winston.debug("Received getDirectory response", node);
winston.debug("Received getDirectory response", node);
this._finishRequest();
return resolve(node); // make sure the info is treated before going to next request.
}
else {
winston.debug(node);
winston.debug(new Error(requestedPath));
winston.error(new Error(requestedPath));
}
}
};
winston.debug("Sending getDirectory", qnode);
try {
this._client.sendBERNode(qnode.getDirectory(callback));
}
Expand Down Expand Up @@ -420,6 +421,9 @@ class EmberClient extends EventEmitter {
// We have this part already.
pos++;
if (pos >= pathArray.length) {
if (callback) {
node.getDirectory(callback);
}
return node;
}
currentNode = node;
Expand Down Expand Up @@ -514,7 +518,7 @@ class EmberClient extends EventEmitter {
return;
}
if (error) {
winston.debug("Received getDirectory error", error);
winston.error("Received getDirectory error", error);
this._clearTimeout(); // clear the timeout now. The resolve below may take a while.
this._finishRequest();
reject(error);
Expand Down Expand Up @@ -612,7 +616,7 @@ class EmberClient extends EventEmitter {
resolve(node);
}
};
winston.debug('setValue sending ...', node.getPath(), value);
winston.debug('setValue sending ...', node.getPath(), value);
this._client.sendBERNode(node.setValue(value));
}});
}
Expand Down
22 changes: 0 additions & 22 deletions EmberLib/QualifiedParameter.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,6 @@ class QualifiedParameter extends QualifiedElement {
return r;
}

/**
*
* @param {QualifiedParameter} other
*/
update(other) {
if ((other != null) && (other.contents != null)) {
if (this.contents == null) {
this.contents = other.contents;
}
else {
for (var key in other.contents) {
if (key[0] === "_") {
continue;
}
if (other.contents.hasOwnProperty(key)) {
this.contents[key] = other.contents[key];
}
}
}
}
return;
}

/**
*
Expand Down
102 changes: 102 additions & 0 deletions EmberLib/StreamCollection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
const BER = require("../ber");
const StreamEntry = require("./StreamEntry");

class StreamCollection {
/**
*
*/
constructor() {
/** @type {Map<number,StreamEntry>} */
this.elements = new Map();
}
/**
*
* @param {StreamEntry} entry
*/
addEntry(entry) {
this.elements.set(entry.identifier, entry);
}
/**
*
* @param {StreamEntry} entry
*/
removeEntry(entry) {
this.elements.delete(entry.identifier);
}
/**
*
* @param {number} identifier
* @returns {StreamEntry}
*/
getEntry(identifier) {
return this.elements.get(identifier);
}

/**
* @returns {StreamEntry}
*/
[Symbol.iterator]() {
return this.elements.values();
}

/**
* @retuns {number}
*/
size() {
return this.elements.size;
}

/**
*
* @param {BER.Writer} ber
*/
encode(ber) {
ber.startSequence(StreamCollection.BERID);
for(let [, entry] of this.elements) {
ber.startSequence(BER.CONTEXT(0));
entry.encode(ber);
ber.endSequence();
}
ber.endSequence();
}

/**
* @returns {
* {identifier: number, value: string|number|boolean|Buffer}[]
* }
*/
toJSON() {
const js = [];
for(let [, entry] of this.elements) {
js.push(entry.toJSON());
}
return js;
}

/**
* @returns {number}
*/
static get BERID() {
return BER.APPLICATION(5);
}

/**
*
* @param {BER.ExtendedReader} ber
* @returns {StreamCollection}
*/
static decode(ber) {
const streamCollection = new StreamCollection();
const seq = ber.getSequence(this.BERID);
while (seq.remain > 0) {
const rootReader = seq.getSequence(BER.CONTEXT(0));
while (rootReader.remain > 0) {
const entry = StreamEntry.decode(rootReader);
streamCollection.addEntry(entry);
}
}
return streamCollection;
}
}

module.exports = StreamCollection;
70 changes: 70 additions & 0 deletions EmberLib/StreamEntry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
const BER = require("../ber");
const Errors = require("../Errors");

class StreamEntry {
/**
*
* @param {number} identifier
* @param {string|number|boolean|Buffer} value
*/
constructor(identifier, value ) {
this.identifier = identifier;
this.value = value;
}

/**
*
* @param {BER} ber
*/
encode(ber) {
ber.startSequence(StreamEntry.BERID);
if (this.identifier != null) {
ber.startSequence(BER.CONTEXT(0));
ber.writeInt(this.identifier);
ber.endSequence();
}
if (this.value != null) {
ber.startSequence(BER.CONTEXT(1));
ber.writeValue(this.value);
ber.endSequence();
}
ber.endSequence();
}

/**
* @returns {{
* identifier: number,
* value: string|number
* }}
*/
toJSON() {
return {
identifier: this.identifier,
value: this.value
}
}

static get BERID() {
return BER.APPLICATION(5);
}

static decode(ber) {
const entry = new StreamEntry();
const seq = ber.getSequence(this.BERID);
while(seq.remain > 0) {
const tag = seq.peek();
const data = seq.getSequence(tag);
if(tag == BER.CONTEXT(0)) {
entry.identifier = data.readInt();
} else if(tag == BER.CONTEXT(1)) {
entry.value = data.readValue();
}
else {
throw new Errors.UnimplementedEmberTypeError(tag);
}
}
return entry;
}
}

module.exports = StreamEntry;
7 changes: 3 additions & 4 deletions EmberLib/TreeNode.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ class TreeNode extends ElementInterface {

_isSubscribable(callback) {
return (callback != null &&
((this.isParameter() && this.isStream()) ||
this.isMatrix()));
(this.isParameter() || this.isMatrix()));
}

_subscribe(callback) {
Expand Down Expand Up @@ -236,7 +235,7 @@ class TreeNode extends ElementInterface {
* @param {function} callback
*/
getDirectory(callback) {
if (this._isSubscribable(callback)) {
if (this._isSubscribable(callback) && !this.isStream()) {
this._subscribe(callback);
}
return this.getCommand(new Command(COMMAND_GETDIRECTORY));
Expand Down Expand Up @@ -471,7 +470,7 @@ class TreeNode extends ElementInterface {
* @param {function} callback
*/
subscribe(callback) {
if (this._isSubscribable(callback)) {
if (this._isSubscribable(callback) && this.isStream()) {
this._subscribe(callback);
}
return this.getCommand(new Command(COMMAND_SUBSCRIBE));
Expand Down
9 changes: 7 additions & 2 deletions EmberServer/EmberServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ const EmberLib = require('../EmberLib');
const JSONParser = require("./JSONParser");
const ElementHandlers = require("./ElementHandlers");
const ServerEvents = require("./ServerEvents");
const winston = require("winston");
const Errors = require("../Errors");
const winston = require("winston");

class TreeServer extends EventEmitter{
/**
Expand All @@ -16,7 +16,7 @@ class TreeServer extends EventEmitter{
*/
constructor(host, port, tree) {
super();
this._debug = false;
this._debug = true;
this.timeoutValue = 2000;
this.server = new S101Server(host, port);
this.tree = tree;
Expand Down Expand Up @@ -390,6 +390,7 @@ class TreeServer extends EventEmitter{
setValue(element, value, origin, key) {
return new Promise(resolve => {
// Change the element value if write access permitted.
winston.debug("New Setvalue request");
if (element.contents == null) {
return resolve();
}
Expand All @@ -398,6 +399,7 @@ class TreeServer extends EventEmitter{
(element.contents.access != null) &&
(element.contents.access.value > 1)) {
element.contents.value = value;
winston.debug("New value ", value, "path", element.getPath());
const res = this.getResponse(element);
this.updateSubscribers(element.getPath(),res, origin);
}
Expand Down Expand Up @@ -460,6 +462,7 @@ class TreeServer extends EventEmitter{
*/
updateSubscribers(path, response, origin) {
if (this.subscribers[path] == null) {
winston.debug("No subscribers for", path);
return;
}

Expand All @@ -468,10 +471,12 @@ class TreeServer extends EventEmitter{
continue; // already sent the response to origin
}
if (this.clients.has(client)) {
winston.debug("Sending new value to", client.remoteAddress());
client.queueMessage(response);
}
else {
// clean up subscribers - client is gone
winston.debug("deleting client");
this.subscribers[path].delete(client);
}
}
Expand Down
1 change: 0 additions & 1 deletion EmberServer/QualifiedHandlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class QualifiedHandlers extends MatrixHandlers {
this.server.setValue(element, parameter.contents.value, client);
let res = this.server.getQualifiedResponse(element);
client.sendBERNode(res)
this.server.updateSubscribers(element.getPath(), res, client);
}
}
}
Expand Down
Loading