Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions client-js/main/client/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ export class Client {
throw new Error('Not implemented in abstract base.');
}

/**
* Immediately cancels all active subscriptions.
*
* This endpoint is handy to use when an end-user chooses to end her session
* with the web app. E.g. all subscriptions should be cancelled upon user sign-out.
*/
cancelAllSubscriptions() {
throw new Error('Not implemented in abstract base.');
}

/**
* Subscribes to the given `Topic` instance.
*
Expand Down
7 changes: 7 additions & 0 deletions client-js/main/client/composite-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ export class CompositeClient extends Client {
return this._subscribing.subscribeTo(entityType, this);
}

/**
* @override
*/
cancelAllSubscriptions() {
this._subscribing.cancelAllSubscriptions();
}

/**
* @override
*/
Expand Down
30 changes: 25 additions & 5 deletions client-js/main/client/firebase-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import {ActorRequestFactory} from './actor-request-factory';
import {AbstractClientFactory} from './client-factory';
import {CommandingClient} from "./commanding-client";
import {CompositeClient} from "./composite-client";
import {HttpClient} from './http-client';
import {HttpEndpoint} from './http-endpoint';
import {FirebaseDatabaseClient} from './firebase-database-client';
import {FirebaseSubscriptionService} from './firebase-subscription-service';
Expand Down Expand Up @@ -84,9 +83,9 @@ class SpineSubscription extends Subscription {
class EntitySubscription extends SpineSubscription {

/**
* @param {Function} unsubscribe the callbacks that allows to cancel the subscription
* @param {Function} unsubscribe the callback that allows to cancel the subscription
* @param {{itemAdded: Observable, itemChanged: Observable, itemRemoved: Observable}} observables
* the observables for entity changes
* the observables for entity change
* @param {SubscriptionObject} subscription the wrapped subscription object
*/
constructor({
Expand All @@ -102,7 +101,13 @@ class EntitySubscription extends SpineSubscription {
* @return {EntitySubscriptionObject} a plain object with observables and unsubscribe method
*/
toObject() {
return Object.assign({}, this._observables, {unsubscribe: () => this.unsubscribe()});
return Object.assign({},
this._observables,
{
unsubscribe: () => {
return this.unsubscribe();
}
});
}
}

Expand Down Expand Up @@ -130,7 +135,13 @@ class EventSubscription extends SpineSubscription {
*/
toObject() {
return Object.assign(
{}, {eventEmitted: this._observable}, {unsubscribe: () => this.unsubscribe()}
{},
{eventEmitted: this._observable},
{
unsubscribe: () => {
return this.unsubscribe();
}
}
);
}
}
Expand Down Expand Up @@ -243,6 +254,7 @@ class FirebaseSubscribingClient extends SubscribingClient {
return new EntitySubscription({
unsubscribedBy: () => {
FirebaseSubscribingClient._unsubscribe(pathSubscriptions);
this._subscriptionService.cancelSubscription(subscription);
},
withObservables: {
itemAdded: ObjectToProto.map(itemAdded.asObservable(), typeUrl),
Expand All @@ -263,12 +275,20 @@ class FirebaseSubscribingClient extends SubscribingClient {
return new EventSubscription({
unsubscribedBy: () => {
FirebaseSubscribingClient._unsubscribe([pathSubscription]);
this._subscriptionService.cancelSubscription(subscription);
},
withObservable: ObjectToProto.map(itemAdded.asObservable(), EVENT_TYPE_URL),
forInternal: subscription
});
}

/**
* @override
*/
cancelAllSubscriptions() {
this._subscriptionService.cancelAllSubscriptions();
}

/**
* Unsubscribes the provided Firebase subscriptions.
*
Expand Down
45 changes: 42 additions & 3 deletions client-js/main/client/firebase-subscription-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import {Status} from '../proto/spine/core/response_pb';
const DEFAULT_KEEP_UP_INTERVAL = new Duration({minutes: 2});

/**
* A service that manages the active subscriptions periodically sending requests to keep them
* running.
* A service that manages the active subscriptions periodically sending requests
* to keep them running.
*/
export class FirebaseSubscriptionService {

Expand Down Expand Up @@ -84,6 +84,33 @@ export class FirebaseSubscriptionService {
}
}

/**
* Immediately cancels all active subscriptions previously created through this service.
*/
cancelAllSubscriptions() {
const activeSubscriptions = this._subscriptions.filter(s => !s.closed);
if (activeSubscriptions.length > 0) {
const subscriptionMessages = activeSubscriptions.map(s => s.internal())
this._endpoint.cancelAll(subscriptionMessages);
activeSubscriptions.forEach(s => {
s.unsubscribe() /* Calling RxJS's `unsubscribe` to stop propagating the updates. */
this._removeSubscription(s)
})
}
}

/**
* Immediately cancels the given subscription, including cancelling it on the server-side.
*
* @param {SubscriptionObject} subscription the subscription to cancel
*/
cancelSubscription(subscription) {
this._endpoint.cancelSubscription(subscription)
.then(() => {
this._removeSubscription(subscription)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here subscription is a Protobuf message, _removeSubscription expects something derived from RxJS Subscription. I guess we should rework _removeSubscription method so it could remove subscriptions by ID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YegorUdovchenko
That's an interesting point. Perhaps, you are right.

However, I don't understand how this thing worked previously. It uses subscription.internal() objects, which AFAIU are Protobuf messages.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it actually works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YegorUdovchenko
I have reworked this removal thing to take any kind of Subscription, and operate by its ID. I think, that's the safest approach for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@armiol
Looks ok.

});
}

/**
* Indicates whether this service is running keeping up subscriptions.
*
Expand Down Expand Up @@ -150,10 +177,22 @@ export class FirebaseSubscriptionService {
* Removes the provided subscription from subscriptions list, which stops any attempts
* to update it. In case no more subscriptions are left, stops this service.
*
* In case the passed subscription is not known to this service, does nothing.
*
* @param subscription a subscription to cancel;
* this method accepts values of both `SpineSubscription`
* and Proto `Subscription` types,
* and operates based on the subscription ID
* @private
*/
_removeSubscription(subscription) {
const index = this._subscriptions.indexOf(subscription);
let id;
if (typeof subscription.id === 'function') {
id = subscription.id();
} else {
id = subscription.getId().getValue();
}
const index = this._subscriptions.findIndex(item => item.id() === id);
this._subscriptions.splice(index, 1);

if (this._subscriptions.length === 0) {
Expand Down
16 changes: 16 additions & 0 deletions client-js/main/client/subscribing-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ export class SubscribingClient {
throw new Error('Not implemented in abstract base.');
}

/**
* Cancels all subscriptions, which were created through this instance of subscribing client.
*/
cancelAllSubscriptions() {
throw new Error('Not implemented in abstract base.');
}

/**
* Returns a new topic factory instance which can be further used for the `Topic` creation.
*
Expand Down Expand Up @@ -123,4 +130,13 @@ export class NoOpSubscribingClient extends SubscribingClient {
subscribeToEvents(topic) {
throw new Error(SUBSCRIPTIONS_NOT_SUPPORTED);
}

/**
* Always throws an error.
*
* @override
*/
cancelAllSubscriptions() {
throw new Error(SUBSCRIPTIONS_NOT_SUPPORTED);
}
}
2 changes: 1 addition & 1 deletion client-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "spine-web",
"version": "1.9.0-SNAPSHOT.9",
"version": "1.9.0-SNAPSHOT.10",
"license": "Apache-2.0",
"description": "A JS client for interacting with Spine applications.",
"homepage": "https://spine.io",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@
*
* <p>See <a href="https://firebase.google.com/docs/database/rest/auth">Firebase REST docs</a>.
*/
@SuppressWarnings("deprecation")
// Use deprecated `GoogleCredential` to retain backward compatibility.
@SuppressWarnings("deprecation" /*`GoogleCredential` is used to retain backward compatibility.*/)
public final class FirebaseCredentials implements HttpRequestInitializer {

private static final String AUTH_DATABASE = "https://www.googleapis.com/auth/firebase.database";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,18 @@ public Builder setSubscriptionLifeSpan(Duration subscriptionLifeSpan) {
/**
* Creates a new instance of {@code FirebaseQueryBridge}.
*
* <p>Mandatory fields are {@link #setFirebaseClient(FirebaseClient) firebaseClient}
* and {@link #setSubscriptionService(SubscriptionServiceImplBase) subscriptionService}.
*
* @return new instance of {@code FirebaseQueryBridge}
*/
public FirebaseSubscriptionBridge build() {
checkState(firebaseClient != null,
"Firebase database client is not set to FirebaseSubscriptionBridge.");
"Mandatory Firebase database client" +
" is not specified for `FirebaseSubscriptionBridge`.");
checkState(subscriptionService != null,
"Subscription Service is not set to FirebaseSubscriptionBridge.");
"Mandatory Subscription Service is not specified" +
" for `FirebaseSubscriptionBridge`.");
return new FirebaseSubscriptionBridge(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* <p>To understand whether a client is still listening to the {@code Topic} updates, she
* periodically sends a {@link FirebaseSubscriptionBridge#keepUp(Subscription) keepUp(Subscription)}
* request. The server records the timestamps of these requests in this log and counts the client
* alive, as long as the {@linkplain #withTimeout(Duration)} configured} timeout does not pass
* alive, as long as the {@linkplain #withTimeout(Duration) configured} timeout does not pass
* since the last request.
*/
final class HealthLog {
Expand Down Expand Up @@ -113,4 +113,15 @@ boolean isStale(Topic topic) {
Duration elapsed = between(lastUpdate, now);
return compare(elapsed, expirationTimeout) > 0;
}

/**
* Removes the given {@code Topic} from this health log.
*
* <p>In case this topic is not known to this registry, does nothing, allowing
* to safely clear the health log from stale topics potentially residing in storage
* on either client- or server-sides.
*/
void remove(Topic topic) {
updateTimes.remove(topic.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.google.protobuf.Duration;
import io.spine.client.Subscription;
import io.spine.client.Topic;
import io.spine.json.Json;
import io.spine.web.firebase.FirebaseClient;
import io.spine.web.firebase.NodePath;
import io.spine.web.firebase.NodePaths;
Expand All @@ -44,6 +43,7 @@
import java.util.Optional;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.spine.json.Json.fromJson;
import static io.spine.util.Exceptions.illegalStateWithCauseOf;
import static io.spine.web.firebase.subscription.LazyRepository.lazy;

Expand Down Expand Up @@ -81,7 +81,7 @@ final class SubscriptionRepository {
/**
* Fetches all the existing subscriptions from the Firebase and activates them.
*
* <p>After calling this method, all the new subscriptions are automatically activates on this
* <p>After calling this method, all the new subscriptions are automatically activated on this
* server instance.
*/
void subscribeToAll() {
Expand Down Expand Up @@ -152,6 +152,7 @@ private void delete(Topic topic) {
checkNotNull(topic);
NodePath path = pathForSubscription(topic);
firebase.delete(path);
healthLog.remove(topic);
}

private static NodePath pathForSubscription(Topic topic) {
Expand Down Expand Up @@ -200,7 +201,7 @@ private static String asJson(DataSnapshot snapshot) {
}

private Topic loadTopic(String json) {
Topic topic = Json.fromJson(json, Topic.class);
Topic topic = fromJson(json, Topic.class);
repository.healthLog.put(topic);
return topic;
}
Expand All @@ -216,7 +217,31 @@ private void deleteOrActivate(Topic topic) {

@Override
public void onChildRemoved(DataSnapshot snapshot) {
// NOP.
String json = asJson(snapshot);
Optional<Topic> topic = parseTopic(json);
topic.ifPresent(t -> {
HealthLog healthLog = repository.healthLog;
if (healthLog.isKnown(t)) {
repository.delete(t);
}
});
}

/**
* Safely parses the {@code Topic} from the passed JSON.
*
* @param json
* JSON to parse
* @return parsed {@code Topic} wrapped as {@code Optional},
* or {@code Optional.empty()} if there was a parsing error
*/
private static Optional<Topic> parseTopic(String json) {
try {
Topic topic = fromJson(json, Topic.class);
return Optional.of(topic);
} catch (RuntimeException e) {
return Optional.empty();
}
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/js-tests/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "client-js-tests",
"version": "1.9.0-SNAPSHOT.9",
"version": "1.9.0-SNAPSHOT.10",
"license": "Apache-2.0",
"description": "Tests of a `spine-web` JS library against the Spine-based application.",
"scripts": {
Expand Down
Loading