Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client-js/main/client/client-factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import {HttpEndpoint} from "./http-endpoint";
* @property {?TenantProvider} tenantProvider
* the provider of an active tenant ID, if not specified, the application is considered
* single-tenant
* @property {?Duration} subscriptionKeepUpInterval
* the custom interval for sending requests to keep up subscriptions
*/

/**
Expand Down
18 changes: 8 additions & 10 deletions client-js/main/client/firebase-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,9 @@ class FirebaseSubscribingClient extends SubscribingClient {
const itemRemoved = new Subject();

const pathSubscriptions = [
this._firebase
.onChildAdded(path, itemAdded.next.bind(itemAdded)),
this._firebase
.onChildChanged(path, itemChanged.next.bind(itemChanged)),
this._firebase
.onChildRemoved(path, itemRemoved.next.bind(itemRemoved))
this._firebase.onChildAdded(path, itemAdded),
this._firebase.onChildChanged(path, itemChanged),
this._firebase.onChildRemoved(path, itemRemoved)
];

const typeUrl = subscription.getTopic().getTarget().getType();
Expand All @@ -263,8 +260,7 @@ class FirebaseSubscribingClient extends SubscribingClient {
*/
_eventSubscription(path, subscription) {
const itemAdded = new Subject();
const pathSubscription =
this._firebase.onChildAdded(path, itemAdded.next.bind(itemAdded));
const pathSubscription = this._firebase.onChildAdded(path, itemAdded);

return new EventSubscription({
unsubscribedBy: () => {
Expand Down Expand Up @@ -330,7 +326,8 @@ export class FirebaseClientFactory extends AbstractClientFactory {
const endpoint = new HttpEndpoint(httpClient, options.routing);
const firebaseDatabaseClient = new FirebaseDatabaseClient(options.firebaseDatabase);
const requestFactory = ActorRequestFactory.create(options);
const subscriptionService = new FirebaseSubscriptionService(endpoint);
const subscriptionService =
new FirebaseSubscriptionService(endpoint, options.subscriptionKeepUpInterval);

const querying = new FirebaseQueryingClient(endpoint, firebaseDatabaseClient, requestFactory);
const subscribing = new FirebaseSubscribingClient(endpoint,
Expand All @@ -355,7 +352,8 @@ export class FirebaseClientFactory extends AbstractClientFactory {
const endpoint = new HttpEndpoint(httpClient, options.routing);
const firebaseDatabaseClient = new FirebaseDatabaseClient(options.firebaseDatabase);
const requestFactory = ActorRequestFactory.create(options);
const subscriptionService = new FirebaseSubscriptionService(endpoint);
const subscriptionService =
new FirebaseSubscriptionService(endpoint, options.subscriptionKeepUpInterval);

return new FirebaseSubscribingClient(endpoint,
firebaseDatabaseClient,
Expand Down
25 changes: 13 additions & 12 deletions client-js/main/client/firebase-database-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

"use strict";

import {Subscription} from 'rxjs';
import {Subscription, Subject} from 'rxjs';

/**
* The client of a Firebase Realtime database.
Expand All @@ -42,12 +42,12 @@ export class FirebaseDatabaseClient {
* Each child's value is parsed as a JSON and dispatched to the given callback
*
* @param {!string} path the path to the watched node
* @param {!consumerCallback<Object>} dataCallback the child value callback
* @param {!Subject<Object>} dataSubject the subject receiving child values
*
* @return {Subscription} a Subscription that can be unsubscribed
*/
onChildAdded(path, dataCallback) {
return this._subscribeToChildEvent('child_added', path, dataCallback);
onChildAdded(path, dataSubject) {
return this._subscribeToChildEvent('child_added', path, dataSubject);
}

/**
Expand All @@ -56,12 +56,12 @@ export class FirebaseDatabaseClient {
* Each child's value is parsed as a JSON and dispatched to the given callback
*
* @param {!string} path the path to the watched node
* @param {!consumerCallback<Object>} dataCallback the child value callback
* @param {!Subject<Object>} dataSubject the subject receiving child values
*
* @return {Subscription} a Subscription that can be unsubscribed
*/
onChildChanged(path, dataCallback) {
return this._subscribeToChildEvent('child_changed', path, dataCallback);
onChildChanged(path, dataSubject) {
return this._subscribeToChildEvent('child_changed', path, dataSubject);
}

/**
Expand All @@ -70,23 +70,24 @@ export class FirebaseDatabaseClient {
* Each child's value is parsed as a JSON and dispatched to the given callback
*
* @param {!string} path the path to the watched node
* @param {!consumerCallback<Object>} dataCallback the child value callback
* @param {!Subject<Object>} dataSubject the subject receiving child values
*
* @return {Subscription} a Subscription that can be unsubscribed
*/
onChildRemoved(path, dataCallback) {
return this._subscribeToChildEvent('child_removed', path, dataCallback);
onChildRemoved(path, dataSubject) {
return this._subscribeToChildEvent('child_removed', path, dataSubject);
}

_subscribeToChildEvent(childEvent, path, dataCallback) {
_subscribeToChildEvent(childEvent, path, dataSubject) {
const dbRef = this._database.ref(path);
const callback = dbRef.on(childEvent, response => {
const msgJson = response.val();
const message = JSON.parse(msgJson);
dataCallback(message);
dataSubject.next(message);
});
return new Subscription(() => {
dbRef.off(childEvent, callback);
dataSubject.complete();
});
}

Expand Down
78 changes: 51 additions & 27 deletions client-js/main/client/firebase-subscription-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,27 @@
"use strict";

import {Duration} from './time-utils';
import ObjectToProto from "./object-to-proto";
import ObjectToProto from './object-to-proto';
import {Status} from '../proto/spine/core/response_pb';


const SUBSCRIPTION_KEEP_UP_INTERVAL = new Duration({minutes: 2});
/**
* The default interval for sending subscription keep up requests.
*
* @type {Duration}
*/
const DEFAULT_KEEP_UP_INTERVAL = new Duration({minutes: 2});

/**
* A service that manages the active subscriptions periodically sending requests to keep them
* running.
*/
export class FirebaseSubscriptionService {

/**
* @param {Endpoint} endpoint an endpoint to communicate with
* @param {?Duration} keepUpInterval a custom interval for sending subscription keep up requests
*/
constructor(endpoint) {
constructor(endpoint, keepUpInterval) {
/**
* @type {SpineSubscription[]}
* @private
Expand All @@ -46,6 +52,13 @@ export class FirebaseSubscriptionService {
* @private
*/
this._endpoint = endpoint;
/**
* @type {Duration}
* @private
*/
this._keepUpInterval = keepUpInterval
? keepUpInterval
: DEFAULT_KEEP_UP_INTERVAL;
}

/**
Expand All @@ -59,19 +72,41 @@ export class FirebaseSubscriptionService {
throw new Error('This subscription is already registered in subscription service');
}
this._subscriptions.push(subscription);

if (!this._isRunning()) {
this._run();
}
}

/**
* Starts the subscription service, keeping up the added subscriptions.
* Indicates whether this service is running keeping up subscriptions.
*
* @returns {boolean}
* @private
*/
run() {
if (this._interval) {
throw new Error('The FirebaseSubscriptionService is already running');
}
_isRunning() {
return !!this._interval;
}

/**
* Starts the subscription service, keeping up the added subscriptions.
*
* @private
*/
_run() {
this._interval = setInterval(() => {
this._keepUpSubscriptions();
}, SUBSCRIPTION_KEEP_UP_INTERVAL.inMs());
}, this._keepUpInterval.inMs());
}

/**
* Stops the subscription service.
*
* @private
*/
_stop() {
clearInterval(this._interval);
this._interval = null;
}

/**
Expand All @@ -93,7 +128,7 @@ export class FirebaseSubscriptionService {
} else {
this._endpoint.keepUpSubscription(spineSubscription).then(response => {
const responseStatus = response.status;
const responseStatusProto = ObjectToProto.convert(responseStatus, _statusType);
const responseStatusProto = ObjectToProto.convert(responseStatus, Status.typeUrl());
if (responseStatusProto.getStatusCase() !== Status.StatusCase.OK) {
this._removeSubscription(subscription)
}
Expand All @@ -102,30 +137,19 @@ export class FirebaseSubscriptionService {
});
}

/**
* Stops the subscription service unsubscribing and removing all added subscriptions.
*/
stop() {
if (!this._interval) {
throw new Error('The FirebaseSubscriptionService was stopped when it was not running');
}
clearInterval(this._interval);
this._subscriptions.forEach(subscription => {
subscription.unsubscribe();
this._removeSubscription(subscription);
});
this._interval = null;
}

/**
* Removes the provided subscription from subscriptions list, which stops any attempts
* to update it.
* to update it. In case no more subscriptions are left, stops this service.
*
* @private
*/
_removeSubscription(subscription) {
const index = this._subscriptions.indexOf(subscription);
this._subscriptions.splice(index, 1);

if (this._subscriptions.length === 0) {
this._stop();
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions client-js/main/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ export {Client} from './client/client';
export {init} from './client/spine';
export {TenantIds, TenantProvider} from './client/tenant';
export * from './client/errors';
export {Duration} from './client/time-utils';
2 changes: 1 addition & 1 deletion client-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "spine-web",
"version": "1.6.0",
"version": "1.6.1",
"license": "Apache-2.0",
"description": "A JS client for interacting with Spine applications.",
"homepage": "https://spine.io",
Expand Down
1 change: 1 addition & 0 deletions integration-tests/js-tests/.babelrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"@babel/preset-env"
],
"plugins": [
"@babel/plugin-transform-runtime",
[
"module-resolver",
{
Expand Down
4 changes: 3 additions & 1 deletion integration-tests/js-tests/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "client-js-tests",
"version": "1.6.0",
"version": "1.6.1",
"license": "Apache-2.0",
"description": "Tests of a `spine-web` JS library against the Spine-based application.",
"scripts": {
Expand All @@ -9,6 +9,7 @@
},
"devDependencies": {
"@babel/core": "^7.0.0",
"@babel/plugin-transform-runtime": "^7.12.1",
"@babel/preset-env": "^7.0.0",
"@babel/register": "^7.0.0",
"assert": "^2.0.0",
Expand All @@ -17,6 +18,7 @@
"google-protobuf": "^3.8.0",
"mocha": "^7.2.0",
"rxjs": "~6.5.1",
"sinon": "^9.2.0",
"uuid": "^3.4.0"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@ import TestEnvironment from "../../given/test-environment";
*
* @param {!string} endpointUrl the URL of a backend to interact with
* @param {?TenantProvider} tenantProvider the tenant provider for multitenant context tests
* @param {?Duration} keepUpInterval the custom interval for sending requests to
* keep up subscriptions in tests
* @return {FirebaseClient} the Firebase client instance
*/
export function initClient(endpointUrl, tenantProvider) {
export function initClient(endpointUrl, tenantProvider, keepUpInterval) {
return spineWeb.init({
protoIndexFiles: [testProtobuf],
endpointUrl: endpointUrl,
firebaseDatabase: firebaseDatabase,
actorProvider: new ActorProvider(),
tenantProvider: tenantProvider
tenantProvider: tenantProvider,
subscriptionKeepUpInterval: keepUpInterval
});
}

Expand Down
Loading