Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions client-js/main/client/firebase-subscription-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,23 @@ export class FirebaseSubscriptionService {
* @private
*/
_keepUpSubscriptions() {
this._subscriptions.forEach(subscription => {
const spineSubscription = subscription.internal();
if (subscription.closed) {
this._endpoint.cancelSubscription(spineSubscription).then(() => {
this._removeSubscription(subscription);
});
} else {
this._endpoint.keepUpSubscription(spineSubscription).then(response => {
const responseStatus = response.status;
const responseStatusProto = ObjectToProto.convert(responseStatus, Status.typeUrl());
if (responseStatusProto.getStatusCase() !== Status.StatusCase.OK) {
this._removeSubscription(subscription)
}
});
const cancelledSubscriptions = this._subscriptions.filter(s => s.closed);
if (cancelledSubscriptions.length > 0) {
const subscriptionMessages = cancelledSubscriptions.map(s => s.internal())
this._endpoint.cancelAll(subscriptionMessages);
cancelledSubscriptions.forEach(s => this._removeSubscription(s))
}
const subscriptions = this._subscriptions.map(value => value.internal());
if (subscriptions.length === 0) {
return;
}
this._endpoint.keepUpSubscriptions(subscriptions).then(response => {
for (let i = 0; i < response.response.length; i++) {
const r = response.response[i];
const status = ObjectToProto.convert(r.status, Status.typeUrl());
if (status.getStatusCase() !== Status.StatusCase.OK) {
this._removeSubscription(subscriptions[i])
}
}
});
}
Expand Down
148 changes: 119 additions & 29 deletions client-js/main/client/http-endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import {TypedMessage} from './typed-message';
import {ClientError, ConnectionError, ServerError, SpineError} from './errors';
import {Subscriptions} from '../proto/spine/web/keeping_up_pb';

/**
* @typedef {Object} SubscriptionRouting
Expand All @@ -36,8 +37,12 @@ import {ClientError, ConnectionError, ServerError, SpineError} from './errors';
* the name of the subscription creation endpoint; defaults to "/subscription/create"
* @property {string} keepUp
* the name of the subscription keep up endpoint; defaults to "/subscription/keep-up"
* @property {string} keepUpAll
* the name of the subscription bulk keep up endpoint; defaults to "/subscription/keep-up-all"
* @property {string} cancel
* the name of the subscription cancellation endpoint; defaults to "/subscription/cancel"
* @property {string} cancelAll
* the name of the subscription bulk cancellation endpoint; defaults to "/subscription/cancel-all"
*/

/**
Expand All @@ -54,7 +59,7 @@ import {ClientError, ConnectionError, ServerError, SpineError} from './errors';
class Endpoint {

/**
* Sends off a command to the endpoint.
* Sends a command to the endpoint.
*
* @param {!TypedMessage<Command>} command a Command to send to the Spine server
* @return {Promise<Object>} a promise of a successful server response, rejected if
Expand All @@ -65,7 +70,7 @@ class Endpoint {
}

/**
* Sends off a query to the endpoint.
* Sends a query to the endpoint.
*
* @param {!spine.client.Query} query a Query to Spine server to retrieve some domain entities
* @return {Promise<Object>} a promise of a successful server response, rejected if
Expand All @@ -77,7 +82,7 @@ class Endpoint {
}

/**
* Sends off a request to subscribe to a provided topic to an endpoint.
* Sends a request to subscribe to a provided topic to an endpoint.
*
* @param {!spine.client.Topic} topic a topic for which a subscription is created
* @return {Promise<Object>} a promise of a successful server response, rejected if
Expand All @@ -89,21 +94,33 @@ class Endpoint {
}

/**
* Sends off a request to keep a subscription, stopping it from being closed by server.
* Sends a request to keep a subscription, stopping it from being closed by server.
*
* @param {!spine.client.Subscription} subscription a subscription that should be kept open
* @return {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
* @returns {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
*/
keepUpSubscription(subscription) {
keepUpSingleSubscription(subscription) {
const typedSubscription = TypedMessage.of(subscription);
return this._keepUp(typedSubscription);
}

/**
* Sends off a request to cancel an existing subscription.
* Sends a request to keep up several subscriptions, preventing them
* from being closed by the server.
*
* Cancelling subscription stops the server updating subscription with new values.
* @param {!Array<spine.client.Subscription>} subscriptions subscriptions that should be kept open
* @return {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
*/
keepUpSubscriptions(subscriptions) {
return this._keepUpAll(subscriptions);
}

/**
* Sends a request to cancel an existing subscription.
*
* Cancelling subscription stops the server from updating subscription with new values.
*
* @param {!spine.client.Subscription} subscription a subscription that should be kept open
* @return {Promise<Object>} a promise of a successful server response, rejected if
Expand All @@ -114,6 +131,19 @@ class Endpoint {
return this._cancel(typedSubscription);
}

/**
* Sends a request to cancel all the given subscriptions.
*
* Cancelling subscriptions stops the server from updating subscription with new values.
*
* @param {!Array<spine.client.Subscription>>} subscriptions subscriptions that should
* be cancelled
* @return {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
*/
cancelAll(subscriptions) {
return this._cancelAll(subscriptions);
}

/**
* @param {!TypedMessage<Command>} command a Command to send to the Spine server
Expand Down Expand Up @@ -159,6 +189,17 @@ class Endpoint {
throw new Error('Not implemented in abstract base.');
}

/**
* @param {!Array<TypedMessage<spine.client.Subscription>>} subscriptions subscriptions to keep up
* @return {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
* @protected
* @abstract
*/
_keepUpAll(subscriptions) {
throw new Error('Not implemented in abstract base.');
}

/**
* @param {!TypedMessage<spine.client.Subscription>} subscription a subscription to be canceled
* @return {Promise<Object>} a promise of a successful server response, rejected if
Expand All @@ -169,10 +210,22 @@ class Endpoint {
_cancel(subscription) {
throw new Error('Not implemented in abstract base.');
}


/**
* @param {!Array<spine.client.Subscription>} subscriptions subscriptions to be canceled
* @return {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
* @protected
* @abstract
*/
_cancelAll(subscriptions) {
throw new Error('Not implemented in abstract base.');
}
}

/**
* Spine HTTP endpoint which is used to send off Commands and Queries using
* Spine HTTP endpoint which is used to send Commands and Queries using
* the provided HTTP client.
*/
export class HttpEndpoint extends Endpoint {
Expand All @@ -188,12 +241,12 @@ export class HttpEndpoint extends Endpoint {
}

/**
* Sends off a command to the endpoint.
* Sends a command to the endpoint.
*
* @param {!TypedMessage<Command>} command a Command to send to the Spine server
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_executeCommand(command) {
Expand All @@ -202,12 +255,12 @@ export class HttpEndpoint extends Endpoint {
}

/**
* Sends off a query to the endpoint.
* Sends a query to the endpoint.
*
* @param {!TypedMessage<Query>} query a Query to Spine server to retrieve some domain entities
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_performQuery(query) {
Expand All @@ -216,12 +269,12 @@ export class HttpEndpoint extends Endpoint {
}

/**
* Sends off a request to create a subscription for a topic.
* Sends a request to create a subscription for a topic.
*
* @param {!TypedMessage<spine.client.Topic>} topic a topic to subscribe to
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_subscribeTo(topic) {
Expand All @@ -231,13 +284,13 @@ export class HttpEndpoint extends Endpoint {
}

/**
* Sends off a request to keep alive a subscription.
* Sends a request to keep alive the given subscription.
*
* @param {!TypedMessage<spine.client.Subscription>} subscription a subscription that is prevented
* from being closed by server
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_keepUp(subscription) {
Expand All @@ -247,12 +300,31 @@ export class HttpEndpoint extends Endpoint {
}

/**
* Sends off a request to cancel a subscription.
* Sends a request to keep alive the given subscriptions.
*
* @param {!Array<spine.client.Subscription>} subscriptions subscriptions that are prevented
* from being closed by the server
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_keepUpAll(subscriptions) {
const path = (this._routing && this._routing.subscription && this._routing.subscription.keepUpAll)
|| '/subscription/keep-up-all';
const request = new Subscriptions()
request.setSubscriptionList(subscriptions);
const typed = TypedMessage.of(request);
return this._sendMessage(path, typed);
}

/**
* Sends a request to cancel the given subscription.
*
* @param {!TypedMessage<spine.client.Subscription>} subscription a subscription to be canceled
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_cancel(subscription) {
Expand All @@ -261,14 +333,32 @@ export class HttpEndpoint extends Endpoint {
return this._sendMessage(path, subscription);
}

/**
* Sends a request to cancel the given subscriptions.
*
* @param {!Array<spine.client.Subscription>} subscriptions subscriptions to be canceled
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_cancelAll(subscriptions) {
const path = (this._routing && this._routing.subscription && this._routing.subscription.cancelAll)
|| '/subscription/cancel-all';
const request = new Subscriptions();
request.setSubscriptionList(subscriptions);
const typed = TypedMessage.of(request);
return this._sendMessage(path, typed);
}

/**
* Sends the given message to the given endpoint.
*
* @param {!string} endpoint an endpoint to send the message to
* @param {!TypedMessage} message a message to send, as a {@link TypedMessage}
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @private
*/
_sendMessage(endpoint, message) {
Expand All @@ -286,8 +376,8 @@ export class HttpEndpoint extends Endpoint {
*
* @param {!Response} response an HTTP request response
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or if JSON
* parsing fails
* rejected if the client response is not `2xx`,
* or if JSON parsing fails
* @private
*/
static _jsonOrError(response) {
Expand Down
2 changes: 1 addition & 1 deletion client-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "spine-web",
"version": "1.9.0-SNAPSHOT.7",
"version": "1.9.0-SNAPSHOT.8",
"license": "Apache-2.0",
"description": "A JS client for interacting with Spine applications.",
"homepage": "https://spine.io",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.spine.core.Response;
import io.spine.core.Status;
import io.spine.type.TypeUrl;
import io.spine.web.Responses;
import io.spine.web.Subscriptions;
import io.spine.web.firebase.FirebaseClient;
import io.spine.web.firebase.NodePath;
import io.spine.web.firebase.RequestNodePath;
Expand Down Expand Up @@ -107,6 +109,18 @@ public Response keepUp(Subscription subscription) {
return exists ? ok() : missing(subscription);
}

@Override
public Responses keepUpAll(Subscriptions subscriptions) {
checkNotNull(subscriptions);
return subscriptions.getSubscriptionList()
.stream()
.map(this::keepUp)
.collect(Responses::newBuilder,
Responses.Builder::addResponse,
(l, r) -> l.addAllResponse(r.getResponseList()))
.vBuild();
}

@Override
public Response cancel(Subscription subscription) {
checkNotNull(subscription);
Expand All @@ -120,6 +134,18 @@ public Response cancel(Subscription subscription) {
return localSubscription.isPresent() ? ok() : missing(subscription);
}

@Override
public Responses cancelAll(Subscriptions request) {
checkNotNull(request);
Responses.Builder result = Responses.newBuilder();
for (Subscription subscription : request.getSubscriptionList()) {
Response response = cancel(subscription);
result.addResponse(response);
}
return result.vBuild();
}


private static Response missing(Subscription subscription) {
String errorMessage =
format("Subscription `%s` is unknown or already canceled.",
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/js-tests/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "client-js-tests",
"version": "1.9.0-SNAPSHOT.7",
"version": "1.9.0-SNAPSHOT.8",
"license": "Apache-2.0",
"description": "Tests of a `spine-web` JS library against the Spine-based application.",
"scripts": {
Expand Down
Loading