From 9fa75def4d665b58e7143925ca637c93915563be Mon Sep 17 00:00:00 2001 From: James Roper Date: Wed, 24 Jun 2020 13:09:21 +1000 Subject: [PATCH 1/2] Added metadata support to JavaScript. --- node-support/index.js | 1 + node-support/src/cloudevents.js | 75 +++++ node-support/src/command-helper.js | 92 ++---- node-support/src/context-failure.js | 25 ++ node-support/src/crdt-support.js | 15 +- node-support/src/effect-serializer.js | 82 +++++ node-support/src/metadata.js | 194 ++++++++++++ node-support/src/stateless-support.js | 416 ++++++++++++++++++++------ node-support/src/stateless.js | 47 +-- 9 files changed, 748 insertions(+), 199 deletions(-) create mode 100644 node-support/src/cloudevents.js create mode 100644 node-support/src/context-failure.js create mode 100644 node-support/src/effect-serializer.js create mode 100644 node-support/src/metadata.js diff --git a/node-support/index.js b/node-support/index.js index 7971e2501..2ecd9a2b8 100644 --- a/node-support/index.js +++ b/node-support/index.js @@ -24,3 +24,4 @@ module.exports.CloudState = require("./src/cloudstate"); module.exports.EventSourced = require("./src/eventsourced"); module.exports.crdt = require("./src/crdt"); module.exports.Stateless = require("./src/stateless"); +module.exports.Metadata = require("./src/metadata"); diff --git a/node-support/src/cloudevents.js b/node-support/src/cloudevents.js new file mode 100644 index 000000000..a0a02d19f --- /dev/null +++ b/node-support/src/cloudevents.js @@ -0,0 +1,75 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * CloudEvent data. + * + * @interface module:cloudstate.CloudEvent + * @property {string} specversion The CloudEvent spec version + */ +function toCloudevent(metadata) { + return { + get specversion() { + return metadata["ce-specversion"]; + }, + get id() { + return metadata["ce-id"]; + }, + set id(id) { + metadata["ce-id"] = id; + }, + get source() { + return metadata["ce-source"]; + }, + set source(source) { + metadata["ce-source"] = source; + }, + get type() { + return metadata["ce-type"]; + }, + set type(type) { + metadata["ce-type"] = type; + }, + get datacontenttype() { + return metadata["Content-Type"]; + }, + set datacontenttype(datacontenttype) { + metadata["Content-Type"] = datacontentype; + }, + get dataschema() { + return metadata["ce-dataschema"]; + }, + set dataschema(dataschema) { + metadata["ce-dataschema"] = dataschema; + }, + get subject() { + return metadata["ce-subject"]; + }, + set subject(subject) { + metadata["ce-subject"] = subject; + }, + get time() { + return metadata["ce-time"]; + }, + set time(time) { + metadata["ce-time"] = time; + }, + }; +} + +module.exports = { + toCloudevent +}; \ No newline at end of file diff --git a/node-support/src/command-helper.js b/node-support/src/command-helper.js index 4c8ffa53e..86860a768 100644 --- a/node-support/src/command-helper.js +++ b/node-support/src/command-helper.js @@ -15,16 +15,10 @@ */ const AnySupport = require("./protobuf-any"); - -class ContextFailure extends Error { - constructor(msg) { - super(msg); - if (Error.captureStackTrace) { - Error.captureStackTrace(this, ContextFailure); - } - this.name = "ContextFailure"; - } -} +const EffectSerializer = require("./effect-serializer"); +const ContextFailure = require("./context-failure"); +const Metadata = require("./metadata"); +const CloudEvents = require("./cloudevents"); /** * Creates the base for context objects. @@ -37,7 +31,7 @@ module.exports = class CommandHelper { this.service = service; this.streamId = streamId; this.call = call; - this.allEntities = allEntities; + this.effectSerializer = new EffectSerializer(allEntities); this.debug = debug; this.handlerFactory = handlerFactory; } @@ -49,7 +43,12 @@ module.exports = class CommandHelper { * @private */ handleCommand(command) { - const ctx = this.createContext(command.id); + let metadata = new Metadata([]); + if (command.metadata && command.metadata.entries) { + metadata = new Metadata(command.metadata.entries); + } + + const ctx = this.createContext(command.id, metadata); if (!this.service.methods.hasOwnProperty(command.name)) { ctx.commandDebug("Command '%s' unknown", command.name); @@ -150,7 +149,10 @@ module.exports = class CommandHelper { } else if (userReply !== undefined) { ctx.reply.clientAction = { reply: { - payload: AnySupport.serialize(grpcMethod.resolvedResponseType.create(userReply), false, false) + payload: AnySupport.serialize(grpcMethod.resolvedResponseType.create(userReply), false, false), + metadata: { + entries: ctx.replyMetadata.entries + } } }; ctx.commandDebug("%s reply with type [%s] with %d side effects.", desc, ctx.reply.clientAction.reply.payload.type_url, ctx.effects.length); @@ -169,7 +171,11 @@ module.exports = class CommandHelper { this.debug("%s [%s] (%s) - " + msg, ...[this.streamId, this.entityId].concat(args)); } - createContext(commandId) { + // This creates the context. Note that the context has two levels, first is the internal implementation context, + // this has everything the CRDT and EventSourced support needs to do its stuff, it's where effects and metadata + // are recorded, etc. The second is the user facing context, which is a property on the internal context called + // "context". + createContext(commandId, metadata) { const accessor = {}; accessor.commandDebug = (msg, ...args) => { @@ -186,6 +192,7 @@ module.exports = class CommandHelper { }; accessor.error = null; accessor.forward = null; + accessor.replyMetadata = new Metadata([]); /** * Effect context. @@ -193,6 +200,9 @@ module.exports = class CommandHelper { * @interface module:cloudstate.EffectContext * @property {string} entityId The id of the entity that the command is for. * @property {Long} commandId The id of the command. + * @property {module:cloudstate.Metadata} metadata The metadata associated with the command. + * @property {module:cloudstate.CloudEvent} cloudevent The CloudEvents metadata associated with the command. + * @property {module:cloudstate.Metadata} replyMetadata The metadata to send with a reply. */ /** @@ -204,6 +214,9 @@ module.exports = class CommandHelper { accessor.context = { entityId: this.entityId, commandId: commandId, + metadata: metadata, + cloudevent: CloudEvents.toCloudevent(metadata.getMap), + replyMetadata: accessor.replyMetadata, /** * Emit an effect after processing this command. @@ -212,10 +225,11 @@ module.exports = class CommandHelper { * @param method The entity service method to invoke. * @param {object} message The message to send to that service. * @param {boolean} synchronous Whether the effect should be execute synchronously or not. + * @param {module:cloudstate.Metadata} metadata Metadata to send with the effect. */ - effect: (method, message, synchronous = false) => { + effect: (method, message, synchronous = false, metadata) => { accessor.ensureActive(); - accessor.effects.push(this.serializeSideEffect(method, message, synchronous)) + accessor.effects.push(this.effectSerializer.serializeSideEffect(method, message, synchronous, metadata)) }, /** @@ -224,10 +238,11 @@ module.exports = class CommandHelper { * @function module:cloudstate.CommandContext#thenForward * @param method The entity service method to invoke. * @param {object} message The message to send to that service. + * @param {module:cloudstate.Metadata} metadata Metadata to send with the forward. */ - thenForward: (method, message) => { + thenForward: (method, message, metadata) => { accessor.ensureActive(); - accessor.forward = this.serializeEffect(method, message); + accessor.forward = this.effectSerializer.serializeEffect(method, message, metadata); }, /** @@ -249,45 +264,4 @@ module.exports = class CommandHelper { }; return accessor; } - - serializeEffect(method, message) { - let serviceName, commandName; - // We support either the grpc method, or a protobufjs method being passed - if (typeof method.path === "string") { - const r = new RegExp("^/([^/]+)/([^/]+)$").exec(method.path); - if (r == null) { - throw new Error(util.format("Not a valid gRPC method path '%s' on object '%o'", method.path, method)); - } - serviceName = r[1]; - commandName = r[2]; - } else if (method.type === "rpc") { - serviceName = method.parent.name; - commandName = method.name; - } - - const service = this.allEntities[serviceName]; - - if (service !== undefined) { - const command = service.methods[commandName]; - if (command !== undefined) { - const payload = AnySupport.serialize(command.resolvedRequestType.create(message), false, false); - return { - serviceName: serviceName, - commandName: commandName, - payload: payload - }; - } else { - throw new Error(util.format("Command [%s] unknown on service [%s].", commandName, serviceName)) - } - } else { - throw new Error(util.format("Service [%s] has not been registered as an entity in this user function, and so can't be used as a side effect or forward.", service)) - } - } - - serializeSideEffect(method, message, synchronous) { - const msg = this.serializeEffect(method, message); - msg.synchronous = synchronous; - return msg; - } - }; \ No newline at end of file diff --git a/node-support/src/context-failure.js b/node-support/src/context-failure.js new file mode 100644 index 000000000..466a3f189 --- /dev/null +++ b/node-support/src/context-failure.js @@ -0,0 +1,25 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +module.exports = class ContextFailure extends Error { + constructor(msg) { + super(msg); + if (Error.captureStackTrace) { + Error.captureStackTrace(this, ContextFailure); + } + this.name = "ContextFailure"; + } +}; diff --git a/node-support/src/crdt-support.js b/node-support/src/crdt-support.js index 7eadabd53..39ccec5f3 100644 --- a/node-support/src/crdt-support.js +++ b/node-support/src/crdt-support.js @@ -19,10 +19,11 @@ const debug = require("debug")("cloudstate-crdt"); const util = require("util"); const grpc = require("grpc"); const protoLoader = require("@grpc/proto-loader"); -const protoHelper = require("./protobuf-helper") +const protoHelper = require("./protobuf-helper"); const AnySupport = require("./protobuf-any"); const crdts = require("./crdts"); const CommandHelper = require("./command-helper"); +const Metadata = require("./metadata"); class CrdtServices { constructor() { @@ -214,7 +215,8 @@ class CrdtHandler { this.subscribers.set(ctx.commandId.toString(), { commandId: ctx.commandId, handler: handler, - grpcMethod: grpcMethod + grpcMethod: grpcMethod, + metadata: ctx.context.metadata }); ctx.subscribed = true; } @@ -402,7 +404,7 @@ class CrdtHandler { * @interface module:cloudstate.crdt.StateChangedContext * @extends module:cloudstate.CommandContext */ - const ctx = this.commandHelper.createContext(subscriber.commandId); + const ctx = this.commandHelper.createContext(subscriber.commandId, subscriber.metadata); /** * The CRDT. @@ -457,6 +459,11 @@ class CrdtHandler { handleStreamCancelled(cancelled) { const subscriberKey = cancelled.id.toString(); + const subscriber = this.subscribers.get(subscriberKey); + let metadata = new Metadata([]); + if (subscriber && subscriber.metadata) { + metadata = subscriber.metadata; + } this.subscribers.delete(subscriberKey); if (this.cancelledCallbacks.has(subscriberKey)) { @@ -471,7 +478,7 @@ class CrdtHandler { */ - const ctx = this.commandHelper.createContext(cancelled.id); + const ctx = this.commandHelper.createContext(cancelled.id, metadata); ctx.reply = { commandId: cancelled.id }; diff --git a/node-support/src/effect-serializer.js b/node-support/src/effect-serializer.js new file mode 100644 index 000000000..637a503d1 --- /dev/null +++ b/node-support/src/effect-serializer.js @@ -0,0 +1,82 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +const AnySupport = require("./protobuf-any"); +const util = require("util"); + +module.exports = class EffectSerializer { + + constructor(allEntities) { + this.allEntities = allEntities; + } + + serializeEffect(method, message, metadata) { + let serviceName, commandName; + // We support either the grpc method, or a protobufjs method being passed + if (typeof method.path === "string") { + const r = new RegExp("^/([^/]+)/([^/]+)$").exec(method.path); + if (r == null) { + throw new Error(util.format("Not a valid gRPC method path '%s' on object '%o'", method.path, method)); + } + serviceName = r[1]; + commandName = r[2]; + } else if (method.type === "rpc") { + serviceName = this.fullName(method.parent); + commandName = method.name; + } + + const service = this.allEntities[serviceName]; + + if (service !== undefined) { + const command = service.methods[commandName]; + if (command !== undefined) { + const payload = AnySupport.serialize(command.resolvedRequestType.create(message), false, false); + const effect = { + serviceName: serviceName, + commandName: commandName, + payload: payload + }; + + if (metadata && metadata.entries) { + effect.metadata = { + entries: metadata.entries + } + } + + return effect; + } else { + throw new Error(util.format("Command [%s] unknown on service [%s].", commandName, serviceName)) + } + } else { + throw new Error(util.format("Service [%s] has not been registered as an entity in this user function, and so can't be used as a side effect or forward.", service)) + } + } + + fullName(item) { + if (item.parent && item.parent.name !== "") { + return this.fullName(item.parent) + "." + item.name; + } else { + return item.name; + } + } + + serializeSideEffect(method, message, synchronous) { + const msg = this.serializeEffect(method, message); + msg.synchronous = synchronous; + return msg; + } + +}; \ No newline at end of file diff --git a/node-support/src/metadata.js b/node-support/src/metadata.js new file mode 100644 index 000000000..942610169 --- /dev/null +++ b/node-support/src/metadata.js @@ -0,0 +1,194 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +const CloudEvents = require("./cloudevents"); + +function valueFromEntry(entry) { + if (entry.bytesValue !== undefined) { + return entry.bytesValue; + } else { + return entry.stringValue; + } +} + +/** + * A metadata value. Can either be a string or a buffer. + * + * @typedef module:cloudstate.MetadataValue + * @type {string|Buffer} + */ + +/** + * @classdesc Cloudstate metadata. + * + * Metadata is treated as case insensitive on lookup, and case sensitive on set. Multiple values per key are supported, + * setting a value will add it to the current values for that key. You should delete first if you wish to replace a + * value. + * + * Values can either by strings or byte buffers. If a non string or byte buffer value is set, it will be converted to + * a string using toString. + * + * @interface module:cloudstate.Metadata + * @param {array} entries The list of entries + */ +function Metadata(entries) { + if (entries) { + this.entries = entries; + } else { + this.entries = []; + } + + /** + * The metadata expressed as an object. + * + * The object keys are case insensitive, ie, `metadata.foo` and `metadata.Foo` both return the same value. If there + * are multiple values for a given key, the first one set for that key will be returned. Setting a value will add it + * to the list of existing values for that key. + * + * @name module:cloudstate.Metadata#getMap + * @type {Object} + */ + this.getMap = new Proxy({}, { + get: (target, key) => { + for (const idx in entries) { + const entry = entries[idx]; + if (key.toLowerCase() === entry.key.toLowerCase()) { + return valueFromEntry(entry); + } + } + }, + set: (target, key, value) => { + this.set(key, value) + }, + deleteProperty: (target, key) => this.delete(key), + ownKeys: (target) => { + const keys = []; + entries.forEach(entry => { + keys.push(entry.key); + }); + return keys; + }, + has: (target, key) => this.has(key), + defineProperty: () => { + throw new Error("Metadata.getMap does not support defining properties"); + }, + getOwnPropertyDescriptor: (target, key) => { + const value = this.get(key); + return value ? { + value: value, + writable: true, + enumerable: true, + configurable: true + } : undefined; + } + }); + + /** + * Get all the values for the given key. + * + * The key is case insensitive. + * + * @function module:cloudstate.Metadata#get + * @param {string} key The key to get. + * @returns {Array} All the values, or an empty array if no values exist for the key. + */ + this.get = key => { + const values = []; + entries.forEach(entry => { + if (key.toLowerCase() === entry.key.toLowerCase()) { + values.push(valueFromEntry(entry)); + } + }); + return values; + }; + + /** + * Set a given key value. + * + * This will append the key value to the metadata, it won't replace any existing values for existing keys. + * + * @function module:cloudstate.Metadata#set + * @param {string} key The key to set. + * @param {module:cloudstate.MetadataValue} value The value to set. + */ + this.set = (key, value) => { + const entry = {key}; + if (typeof value === "string") { + entry.stringValue = value; + } else if (Buffer.isBuffer(value)) { + entry.bytesValue = value; + } else { + entry.stringValue = value.toString(); + } + entries.push(entry); + }; + + /** + * Delete all values with the given key. + * + * The key is case insensitive. + * + * @function module:cloudstate.Metadata#delete + * @param {string} key The key to delete. + */ + this.delete = key => { + let idx = 0; + while (idx < entries.length) { + const entry = entries[idx]; + if (key.toLowerCase() !== entry.key.toLowerCase()) { + idx++; + } else { + entries.splice(idx, 1); + } + } + }; + + /** + * Whether there exists a metadata value for the given key. + * + * The key is case insensitive. + * + * @function module:cloudstate.Metadata#has + * @param {string} key The key to check. + */ + this.has = key => { + for (const idx in entries) { + const entry = entries[idx]; + if (key.toLowerCase() === entry.key.toLowerCase()) { + return true; + } + } + }; + + /** + * Clear the metadata. + * + * @function module:cloudstate.Metadata#clear + */ + this.clear = () => { + entries.splice(0, entries.length); + }; + + /** + * The metadata, expressed as a CloudEvent. + * + * @name module:cloudstate.Metadata#cloudevent + * @type {module:cloudstate.CloudEvent} + */ + this.cloudevent = CloudEvents.toCloudevent(this.getMap); +} + +module.exports = Metadata; \ No newline at end of file diff --git a/node-support/src/stateless-support.js b/node-support/src/stateless-support.js index 1893cc305..b2f013da2 100644 --- a/node-support/src/stateless-support.js +++ b/node-support/src/stateless-support.js @@ -17,33 +17,293 @@ const path = require("path"); const grpc = require("grpc"); const protoLoader = require("@grpc/proto-loader"); -const debug = require("debug")("cloudstate-event-sourcing"); +const debug = require("debug")("cloudstate-stateless"); // Bind to stdout debug.log = console.log.bind(console); const AnySupport = require("./protobuf-any"); +const EffectSerializer = require("./effect-serializer"); +const Metadata = require("./metadata"); +const CloudEvents = require("./cloudevents"); class StatelessSupport { - - constructor(root, service, handlers, allEntities) { + constructor(root, service, commandHandlers, allEntities) { this.root = root; - this.service = service; - this.anySupport = new AnySupport(this.root); - this.commandHandlers = handlers.commandHandlers; - this.allEntities = allEntities; + this.service = service; + this.commandHandlers = commandHandlers; + this.anySupport = new AnySupport(this.root); + this.effectSerializer = new EffectSerializer(allEntities); } +} + +class StatelessHandler { - serialize(obj, requireJsonType) { - return AnySupport.serialize(obj, this.options.serializeAllowPrimitives, this.options.serializeFallbackToJson, requireJsonType); - //return AnySupport.serialize(obj, true, false, requireJsonType); + constructor(support, grpcMethod, commandHandler, call, grpcCallback, metadata) { + this.support = support; + this.grpcMethod = grpcMethod; + this.commandHandler = commandHandler; + this.call = call; + this.grpcCallback = grpcCallback; + + this.streamId = Math.random().toString(16).substr(2, 7); + this.streamDebug("Started new call"); + this.supportedEvents = []; + this.callbacks = {}; + this.ctx = this.createContext(metadata); } - deserialize(any) { - return this.anySupport.deserialize(any); + streamDebug(msg, ...args) { + debug("%s [%s.%s] - " + msg, ...[this.streamId, this.support.service.name, this.grpcMethod.name].concat(args)); } -} + createContext(metadata) { + const call = this.call; + let metadataObject = new Metadata([]); + if (metadata && metadata.entries) { + metadataObject = new Metadata(metadata.entries); + } + const cloudevent = CloudEvents.toCloudevent(metadataObject.getMap); + const ctx = { + get cancelled() { + return call.cancelled; + }, + get metadata() { + return metadata; + }, + get cloudevent() { + return cloudevent; + } + }; + + ctx.on = (eventType, callback) => { + if (this.supportedEvents.includes(eventType)) { + this.callbacks[eventType] = callback; + } else { + throw new Error("Unknown event type: " + eventType); + } + }; + return ctx; + } + + invokeCallback(eventType, ...args) { + if (this.callbacks.hasOwnProperty(eventType)) { + this.invokeUserCallback(eventType + " event", this.callbacks[eventType], ...args) + } + } + + ensureNotCancelled() { + if (this.call.cancelled) { + throw new Error("Already replied to unary command, cannot interact further.") + } + } + + handleUnary() { + this.setupUnaryOutContext(); + const deserializedCommand = this.grpcMethod.resolvedRequestType.decode(this.call.request.payload.value); + const userReturn = this.invokeUserCallback("command", this.commandHandler, deserializedCommand, this.ctx); + if (userReturn !== undefined) { + if (this.call.cancelled) { + this.streamDebug("Unary command handler for command %s.%s both sent a reply through the context and returned a value, ignoring return value.", this.support.service.name, this.grpcMethod.name) + } else { + if (typeof userReturn.then === "function") { + userReturn.then(this.ctx.write, this.ctx.fail) + } else { + this.ctx.write(userReturn); + } + } + } + } + + handleStreamedIn() { + this.setupUnaryOutContext(); + this.setupStreamedInContext(); + const userReturn = this.invokeUserCallback("command", this.commandHandler, this.ctx); + if (userReturn !== undefined) { + if (this.call.cancelled) { + this.streamDebug("Streamed command handler for command %s.%s both sent a reply through the context and returned a value, ignoring return value.", this.support.service.name, this.grpcMethod.name) + } else { + if (typeof userReturn.then === "function") { + userReturn.then(this.ctx.write, this.ctx.fail) + } else { + this.ctx.write(userReturn); + } + } + } + } + + handleStreamedOut() { + this.setupStreamedOutContext(); + const deserializedCommand = this.grpcMethod.resolvedRequestType.decode(this.call.request.payload.value); + this.invokeUserCallback("command", this.commandHandler, deserializedCommand, this.ctx); + } + + handleStreamed() { + this.setupStreamedInContext(); + this.setupStreamedOutContext(); + this.invokeUserCallback("command", this.commandHandler, this.ctx); + } + + setupUnaryOutContext() { + const effects = []; + + this.ctx.forward = (method, message, metadata) => { + this.ensureNotCancelled(); + this.streamDebug("Forwarding to %s", method); + const forward = this.support.effectSerializer.serializeEffect(method, message, metadata); + this.grpcCallback(null, { + forward: forward, + sideEffects: effects + }); + }; + + this.ctx.write = (message, metadata) => { + this.ensureNotCancelled(); + this.streamDebug("Sending reply"); + const messageProto = this.grpcMethod.resolvedResponseType.fromObject(message); + const replyPayload = AnySupport.serialize(messageProto, false, false, false); + let replyMetadata = null; + if (metadata && metadata.entries) { + replyMetadata = { + entries: metadata.entries + }; + } + this.grpcCallback(null, { + reply: { + payload: replyPayload, + metadata: replyMetadata + }, + sideEffects: effects + }); + }; + + this.ctx.effect = (method, message, synchronous, metadata) => { + this.ensureNotCancelled(); + this.streamDebug("Emitting effect to %s", method); + effects.push(this.support.effectSerializer.serializeEffect(method, message, synchronous, metadata)); + }; + + this.ctx.fail = error => { + this.ensureNotCancelled(); + this.streamDebug("Failing with %s", error); + this.grpcCallback(null, { + failure: { + description: error + }, + }); + }; + } + + setupStreamedOutContext() { + this.supportedEvents.push("cancelled"); + + this.call.on("cancelled", () => { + this.streamDebug("Received stream cancelled"); + this.invokeCallback("cancelled", this.ctx); + }); + + this.ctx.end = () => { + if (this.call.cancelled) { + this.streamDebug("end invoked when already cancelled."); + } else { + this.streamDebug("Ending stream out"); + this.call.end(); + } + }; + + this.ctx.forward = (method, message, metadata) => { + this.ensureNotCancelled(); + this.streamDebug("Forwarding to %s", method); + const forward = this.support.effectSerializer.serializeEffect(method, message, metadata); + this.call.write({ + forward: forward + }); + }; + + this.ctx.write = (message, metadata) => { + this.ensureNotCancelled(); + this.streamDebug("Sending reply"); + const messageProto = this.grpcMethod.resolvedResponseType.fromObject(message); + const replyPayload = AnySupport.serialize(messageProto, false, false, false); + let replyMetadata = null; + if (metadata && metadata.entries) { + replyMetadata = { + entries: metadata.entries + }; + } + this.call.write({ + reply: { + payload: replyPayload, + metadata: replyMetadata + } + }); + }; + this.ctx.effect = (method, message, synchronous, metadata) => { + this.ensureNotCancelled(); + this.streamDebug("Emitting effect to %s", method); + this.call.write({ + sideEffects: [this.support.effectSerializer.serializeSideEffect(method, message, synchronous, metadata)] + }); + }; + this.ctx.fail = error => { + this.ensureNotCancelled(); + this.streamDebug("Failing with %s", error); + this.call.write({ + failure: { + description: error + }, + }); + this.call.end(); + }; + } + + setupStreamedInContext() { + this.supportedEvents.push("data"); + this.supportedEvents.push("end"); + + this.call.on("data", (data) => { + this.streamDebug("Received data in"); + const deserializedCommand = this.grpcMethod.resolvedRequestType.decode(data.payload.value); + this.invokeCallback("data", deserializedCommand, this.ctx); + }); + + this.call.on("end", () => { + this.streamDebug("Received stream end"); + this.invokeCallback("end", this.ctx); + }); + + this.ctx.cancel = () => { + if (this.call.cancelled) { + this.streamDebug("cancel invoked when already cancelled."); + } else { + this.call.cancel(); + } + } + } + + invokeUserCallback(callbackName, callback, ...args) { + try { + return callback.apply(null, args); + } catch (err) { + const error = "Error handling " + callbackName; + this.streamDebug(error); + console.error(err); + if (!this.call.cancelled) { + const failure = { + failure: { + description: error + }, + }; + if (this.grpcCallback != null) { + this.grpcCallback(null, failure); + } else { + this.call.write(failure); + this.call.end(); + } + } + } + } +} module.exports = class StatelessServices { @@ -52,9 +312,8 @@ module.exports = class StatelessServices { } addService(entity, allEntities) { - this.services[entity.serviceName] = new StatelessSupport(entity.root, entity.service, { - commandHandlers: entity.commandHandlers - }, allEntities); + this.services[entity.serviceName] = new StatelessSupport(entity.root, entity.service, + entity.commandHandlers, allEntities); } entityType() { @@ -78,96 +337,67 @@ module.exports = class StatelessServices { handleStreamedIn: this.handleStreamedIn.bind(this), handleStreamedOut: this.handleStreamedOut.bind(this), handleStreamed: this.handleStreamed.bind(this), - }); + }); } - handleStreamed(call){ - call.on("data", data => { - const service = this.services[data.serviceName]; - if (service && service.commandHandlers.hasOwnProperty(data.name)) { - const userStream = { - write: (userData) => { - const grpcReturn = service.service.methods[data.name].resolvedResponseType.fromObject(userData); - const requireJsonType =true; - call.write({ - reply:{ - payload: AnySupport.serialize(grpcReturn, false, false, requireJsonType) - } - }); - }, - end: () => call.end(), - } - // We call this every time and send a way to stream back .. not sure if this is a good way to do things? - service.commandHandlers[data.name](userStream, service.deserialize(data.payload)); - }else{ - console.warn("There is no user function with name: " + data.serviceName + "." + data.name); + createHandler(call, callback, data) { + const service = this.services[data.serviceName]; + if (service && service.service.methods.hasOwnProperty(data.name)) { + if (service.commandHandlers.hasOwnProperty(data.name)) { + return new StatelessHandler(service, service.service.methods[data.name], service.commandHandlers[data.name], call, callback, data.metadata) + } else { + this.reportError("Service call " + data.serviceName + "." + data.name + " not implemented", call, callback) } - }); - call.on("end", () => { - console.debug("stream ended") - }); + } else { + this.reportError("No service call named " + data.serviceName + "." + data.name + " found", call, callback) + } } - handleStreamedOut(call){ - const data = call.request; - const service = this.services[data.serviceName]; - if (service && service.commandHandlers.hasOwnProperty(data.name)) { - const userStream = { - write: (userData) => { - const grpcReturn = service.service.methods[data.name].resolvedResponseType.fromObject(userData); - const requireJsonType =true; - call.write({ - reply:{ - payload: AnySupport.serialize(grpcReturn, false, false, requireJsonType) - } - }); - }, - end: () => call.end() + reportError(error, call, callback) { + console.warn(error); + const failure = { + failure: { + description: error } - service.commandHandlers[data.name](userStream, service.deserialize(data.payload)); - }else{ - console.warn("There is no user function with name: "+data.name, service); - } + }; + if (callback !== null) { + callback(null, failure); + } else { + call.write(failure); + call.end(); + } } - handleStreamedIn(call, callback){ + handleStreamed(call) { call.on("data", data => { - const service = this.services[data.serviceName]; - if (service && service.commandHandlers.hasOwnProperty(data.name)) { - const userReturn = service.commandHandlers[data.name](service.deserialize(data.payload)); - const grpcReturn = service.service.methods[data.name].resolvedResponseType.fromObject(userReturn); - const requireJsonType =true; - callback(null, { - reply:{ - payload: AnySupport.serialize(grpcReturn, false, false, requireJsonType) - } - }); - }else{ - console.warn("There is no user function with name: " + call.request.serviceName); - callback(); + const handler = this.createHandler(call, null, data); + if (handler) { + handler.handleStreamed(); } }); - call.on("end", () => { - console.debug("stream ended") - }); + } + handleStreamedOut(call) { + const handler = this.createHandler(call, null, call.request); + if (handler) { + handler.handleStreamedOut(); + } } - handleUnary(call, callback){ - const service = this.services[call.request.serviceName]; - if (service && service.commandHandlers.hasOwnProperty(call.request.name)) { - const userReturn = service.commandHandlers[call.request.name](service.deserialize(call.request.payload)); - const grpcReturn = service.service.methods[call.request.name].resolvedResponseType.fromObject(userReturn); - const requireJsonType =true; - var metadata = new grpc.Metadata(); - callback(null, { - reply:{ - payload: AnySupport.serialize(grpcReturn, false, false, requireJsonType) - } - }, metadata); - }else{ - console.warn("There is no user function with name: " + call.request.serviceName); - callback(); - } + handleStreamedIn(call, callback) { + call.on("data", data => { + const handler = this.createHandler(call, callback, data); + if (handler) { + handler.handleStreamedIn(); + } + }); + } + + handleUnary(call, callback) { + const handler = this.createHandler(call, callback, call.request); + if (handler) { + handler.handleUnary(); + } } + }; diff --git a/node-support/src/stateless.js b/node-support/src/stateless.js index 7aa8f53c5..6f505afa0 100644 --- a/node-support/src/stateless.js +++ b/node-support/src/stateless.js @@ -24,10 +24,11 @@ const CloudState = require("./cloudstate"); const statelessServices = new StatelessSupport(); /** - * A Stateless. + * A stateless entity * - * @memberOf module:cloudstate + * @namespace module:cloudstate.stateless */ + class Stateless { /** @@ -41,11 +42,7 @@ class Stateless { this.options = { ...{ - persistenceId: "entity", - snapshotEvery: 100, includeDirs: ["."], - serializeAllowPrimitives: false, - serializeFallbackToJson: false }, ...options }; @@ -72,7 +69,7 @@ class Stateless { * * The names of the properties must match the names of the service calls specified in the gRPC descriptor * - * @type {Object.} + * @type {Object.} */ this.commandHandlers = {}; } @@ -92,42 +89,6 @@ class Stateless { return this.root.lookupType(messageType); } - /** - * The initial state callback. - * - * @member module:cloudstate.EventSourced#initial - * @type module:cloudstate.EventSourced~initialCallback - */ - - /** - * Set the initial state callback. - * - * @param {module:cloudstate.EventSourced~initialCallback} callback The initial state callback. - * @return {module:cloudstate.EventSourced} This entity. - */ - setInitial(callback) { - this.initial = callback; - return this; - } - - /** - * The behavior callback. - * - * @member module:cloudstate.EventSourced#behavior - * @type module:cloudstate.EventSourced~behaviorCallback - */ - - /** - * Set the behavior callback. - * - * @param {module:cloudstate.EventSourced~behaviorCallback} callback The behavior callback. - * @return {module:cloudstate.EventSourced} This entity. - */ - setBehavior(callback) { - this.behavior = callback; - return this; - } - register(allEntities) { statelessServices.addService(this, allEntities); return statelessServices; From b5c089cd6a03343117b911a31c2ada9d2045f7e3 Mon Sep 17 00:00:00 2001 From: James Roper Date: Wed, 8 Jul 2020 16:14:19 +1000 Subject: [PATCH 2/2] Update node-support/src/metadata.js Co-authored-by: Peter Vlugter --- node-support/src/metadata.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node-support/src/metadata.js b/node-support/src/metadata.js index 942610169..e5977b8f9 100644 --- a/node-support/src/metadata.js +++ b/node-support/src/metadata.js @@ -38,7 +38,7 @@ function valueFromEntry(entry) { * setting a value will add it to the current values for that key. You should delete first if you wish to replace a * value. * - * Values can either by strings or byte buffers. If a non string or byte buffer value is set, it will be converted to + * Values can either be strings or byte buffers. If a non string or byte buffer value is set, it will be converted to * a string using toString. * * @interface module:cloudstate.Metadata @@ -191,4 +191,4 @@ function Metadata(entries) { this.cloudevent = CloudEvents.toCloudevent(this.getMap); } -module.exports = Metadata; \ No newline at end of file +module.exports = Metadata;