Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 85 additions & 15 deletions src/libs/Pusher/index.native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {authenticatePusher} from '@userActions/Session';
import CONST from '@src/CONST';
import ONYXKEYS from '@src/ONYXKEYS';
import TYPE from './EventType';
import type {Args, ChunkedDataEvents, EventCallbackError, EventData, PusherEventName, SocketEventCallback, SocketEventName, States} from './types';
import type {Args, ChunkedDataEvents, EventCallbackError, EventData, PusherEventName, PusherSubscription, SocketEventCallback, SocketEventName, States} from './types';
import type PusherModule from './types';

let shouldForceOffline = false;
Expand All @@ -34,7 +34,9 @@ let initPromise = new Promise<void>((resolve) => {
resolveInitPromise = resolve;
});

const eventsBoundToChannels = new Map<string, Map<PusherEventName, (eventData: EventData<PusherEventName>) => void>>();
type BoundCallback = (eventData: EventData<PusherEventName>) => void;

const eventsBoundToChannels = new Map<string, Map<PusherEventName, Set<BoundCallback>>>();
let channels: Record<string, ValueOf<typeof CONST.PUSHER.CHANNEL_STATUS>> = {};

/**
Expand Down Expand Up @@ -126,11 +128,16 @@ function parseEventData<EventName extends PusherEventName>(eventData: EventData<
}

/**
* Binds an event callback to a channel + eventName
* Binds an event callback to a channel + eventName.
* Returns the wrapped callback so it can be individually unbound later.
*/
function bindEventToChannel<EventName extends PusherEventName>(channel: string, eventName?: EventName, eventCallback: (data: EventData<EventName>) => void = () => {}) {
function bindEventToChannel<EventName extends PusherEventName>(
channel: string,
eventName?: EventName,
eventCallback: (data: EventData<EventName>) => void = () => {},
): BoundCallback | undefined {
if (!eventName) {
return;
return undefined;
}

const chunkedDataEvents: Record<string, ChunkedDataEvents> = {};
Expand Down Expand Up @@ -192,24 +199,40 @@ function bindEventToChannel<EventName extends PusherEventName>(channel: string,
if (!eventsBoundToChannels.has(channel)) {
eventsBoundToChannels.set(channel, new Map());
}
const eventMap = eventsBoundToChannels.get(channel);
if (!eventMap?.has(eventName)) {
eventMap?.set(eventName, new Set());
}
const boundCb = callback as BoundCallback;
eventMap?.get(eventName)?.add(boundCb);

eventsBoundToChannels.get(channel)?.set(eventName, callback as (eventData: EventData<PusherEventName>) => void);
return boundCb;
}

/**
* Subscribe to a channel and an event
* Subscribe to a channel and an event.
* Returns a PusherSubscription — a Promise (for backward-compatible .catch()/.then())
* with an .unsubscribe() method that removes only this specific callback.
*/
function subscribe<EventName extends PusherEventName>(
channelName: string,
eventName?: EventName,
eventCallback: (data: EventData<EventName>) => void = () => {},
onResubscribe = () => {},
): Promise<void> {
return initPromise.then(
): PusherSubscription {
let wrappedCb: BoundCallback | undefined;
let disposed = false;

const promise = initPromise.then(
() =>
new Promise((resolve, reject) => {
new Promise<void>((resolve, reject) => {
// eslint-disable-next-line @typescript-eslint/no-deprecated
InteractionManager.runAfterInteractions(() => {
if (disposed) {
resolve();
return;
}

// We cannot call subscribe() before init(). Prevent any attempt to do this on dev.
if (!socket) {
const error = new Error('[Pusher] instance not found. Pusher.subscribe() most likely has been called before Pusher.init()');
Expand Down Expand Up @@ -237,12 +260,27 @@ function subscribe<EventName extends PusherEventName>(
socket.subscribe({
channelName,
onEvent: (event) => {
const callback = eventsBoundToChannels.get(event.channelName)?.get(event.eventName);
callback?.(event.data as EventData<PusherEventName>);
const callbacks = eventsBoundToChannels.get(event.channelName)?.get(event.eventName);
if (callbacks) {
for (const cb of callbacks) {
cb(event.data as EventData<PusherEventName>);
}
}
},
onSubscriptionSucceeded: () => {
channels[channelName] = CONST.PUSHER.CHANNEL_STATUS.SUBSCRIBED;
bindEventToChannel(channelName, eventName, eventCallback);
if (!disposed) {
wrappedCb = bindEventToChannel(channelName, eventName, eventCallback);
} else {
// Handle was disposed mid-handshake — clean up the channel
// if no other subscribers have bound callbacks to it
const eventMap = eventsBoundToChannels.get(channelName);
if (!eventMap || eventMap.size === 0) {
eventsBoundToChannels.delete(channelName);
delete channels[channelName];
socket?.unsubscribe({channelName});
}
}
resolve();
// When subscribing for the first time we register a success callback that can be
// called multiple times when the subscription succeeds again in the future
Expand All @@ -260,16 +298,48 @@ function subscribe<EventName extends PusherEventName>(
},
});
} else {
bindEventToChannel(channelName, eventName, eventCallback);
if (!disposed) {
wrappedCb = bindEventToChannel(channelName, eventName, eventCallback);
}
resolve();
}
});
}),
);

return Object.assign(promise, {
unsubscribe: () => {
disposed = true;
if (!wrappedCb || !eventName) {
return;
}

// 1. Remove this specific callback from tracking
const eventMap = eventsBoundToChannels.get(channelName);
const callbacks = eventMap?.get(eventName);
callbacks?.delete(wrappedCb);

// 2. If last callback for this event, remove the event
if (callbacks?.size === 0) {
eventMap?.delete(eventName);
}

// 3. If last event on this channel, unsubscribe entirely
if (eventMap?.size === 0) {
eventsBoundToChannels.delete(channelName);
delete channels[channelName];
socket?.unsubscribe({channelName});
}

wrappedCb = undefined;
},
});
}

/**
* Unsubscribe from a channel and optionally a specific event
* Unsubscribe from a channel and optionally a specific event.
* This removes ALL callbacks for the given event (or all events on the channel).
* For per-callback removal, use the .unsubscribe() method on the PusherSubscription handle.
*/
function unsubscribe(channelName: string, eventName: PusherEventName = '') {
const channel = getChannel(channelName);
Expand Down
102 changes: 89 additions & 13 deletions src/libs/Pusher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type {
EventCallbackError,
EventData,
PusherEventName,
PusherSubscription,
PusherSubscriptionErrorData,
PusherWithAuthParams,
SocketEventCallback,
Expand Down Expand Up @@ -45,7 +46,10 @@ let initPromise = new Promise<void>((resolve) => {
resolveInitPromise = resolve;
});

const eventsBoundToChannels = new Map<Channel, Set<PusherEventName>>();
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- Pusher callbacks have varying signatures due to chunking wrapper
type BoundCallback = (eventData: any) => void;

const eventsBoundToChannels = new Map<Channel, Map<PusherEventName, Set<BoundCallback>>>();

/**
* Trigger each of the socket event callbacks with the event information
Expand Down Expand Up @@ -118,11 +122,16 @@ function getChannel(channelName: string): Channel | undefined {
}

/**
* Binds an event callback to a channel + eventName
* Binds an event callback to a channel + eventName.
* Returns the wrapped callback so it can be individually unbound later.
*/
function bindEventToChannel<EventName extends PusherEventName>(channel: Channel | undefined, eventName?: EventName, eventCallback: (data: EventData<EventName>) => void = () => {}) {
function bindEventToChannel<EventName extends PusherEventName>(
channel: Channel | undefined,
eventName?: EventName,
eventCallback: (data: EventData<EventName>) => void = () => {},
): BoundCallback | undefined {
if (!eventName || !channel) {
return;
return undefined;
}

const chunkedDataEvents: Record<string, ChunkedDataEvents> = {};
Expand Down Expand Up @@ -184,27 +193,45 @@ function bindEventToChannel<EventName extends PusherEventName>(channel: Channel
};

channel.bind(eventName, callback);

if (!eventsBoundToChannels.has(channel)) {
eventsBoundToChannels.set(channel, new Set());
eventsBoundToChannels.set(channel, new Map());
}
const eventMap = eventsBoundToChannels.get(channel);
if (!eventMap?.has(eventName)) {
eventMap?.set(eventName, new Set());
}
eventsBoundToChannels.get(channel)?.add(eventName);
eventMap?.get(eventName)?.add(callback);

return callback;
}

/**
* Subscribe to a channel and an event
* Subscribe to a channel and an event.
* Returns a PusherSubscription — a Promise (for backward-compatible .catch()/.then())
* with an .unsubscribe() method that removes only this specific callback.
* @param [onResubscribe] Callback to be called when reconnection happen
*/
function subscribe<EventName extends PusherEventName>(
channelName: string,
eventName?: EventName,
eventCallback: (data: EventData<EventName>) => void = () => {},
onResubscribe = () => {},
): Promise<void> {
return initPromise.then(
): PusherSubscription {
let wrappedCb: BoundCallback | undefined;
let resolvedChannel: Channel | undefined;
let disposed = false;

const promise = initPromise.then(
() =>
new Promise((resolve, reject) => {
new Promise<void>((resolve, reject) => {
// eslint-disable-next-line @typescript-eslint/no-deprecated
InteractionManager.runAfterInteractions(() => {
if (disposed) {
resolve();
return;
}

// We cannot call subscribe() before init(). Prevent any attempt to do this on dev.
if (!socket) {
const error = new Error('[Pusher] instance not found. Pusher.subscribe() most likely has been called before Pusher.init()');
Expand Down Expand Up @@ -234,7 +261,18 @@ function subscribe<EventName extends PusherEventName>(
channel.bind('pusher:subscription_succeeded', () => {
// Check so that we do not bind another event with each reconnect attempt
if (!isBound) {
bindEventToChannel(channel, eventName, eventCallback);
if (!disposed) {
wrappedCb = bindEventToChannel(channel, eventName, eventCallback);
resolvedChannel = channel ?? undefined;
} else if (channel) {
// Handle was disposed mid-handshake — clean up the channel
// if no other subscribers have bound callbacks to it
const eventMap = eventsBoundToChannels.get(channel);
if (!eventMap || eventMap.size === 0) {
eventsBoundToChannels.delete(channel);
socket?.unsubscribe(channelName);
}
}
resolve();
isBound = true;
return;
Expand All @@ -258,16 +296,52 @@ function subscribe<EventName extends PusherEventName>(
reject(error);
});
} else {
bindEventToChannel(channel, eventName, eventCallback);
if (!disposed) {
wrappedCb = bindEventToChannel(channel, eventName, eventCallback);
resolvedChannel = channel;
}
resolve();
}
});
}),
);

return Object.assign(promise, {
unsubscribe: () => {
disposed = true;
if (!wrappedCb || !resolvedChannel || !eventName) {
return;
}

// 1. Unbind this specific callback from pusher-js
resolvedChannel.unbind(eventName, wrappedCb);

// 2. Remove from tracking
const eventMap = eventsBoundToChannels.get(resolvedChannel);
const callbacks = eventMap?.get(eventName);
callbacks?.delete(wrappedCb);

// 3. If last callback for this event, remove the event
if (callbacks?.size === 0) {
eventMap?.delete(eventName);
}

// 4. If last event on this channel, unsubscribe entirely
if (eventMap?.size === 0) {
eventsBoundToChannels.delete(resolvedChannel);
socket?.unsubscribe(channelName);
}

wrappedCb = undefined;
resolvedChannel = undefined;
},
});
}

/**
* Unsubscribe from a channel and optionally a specific event
* Unsubscribe from a channel and optionally a specific event.
* This removes ALL callbacks for the given event (or all events on the channel).
* For per-callback removal, use the .unsubscribe() method on the PusherSubscription handle.
*/
function unsubscribe(channelName: string, eventName: PusherEventName = '') {
const channel = getChannel(channelName);
Expand All @@ -294,6 +368,7 @@ function unsubscribe(channelName: string, eventName: PusherEventName = '') {
Log.info('[Pusher] Unsubscribing from channel', false, {channelName});

channel.unbind();
eventsBoundToChannels.delete(channel);
socket?.unsubscribe(channelName);
}
}
Expand Down Expand Up @@ -369,6 +444,7 @@ function disconnect() {
socket.disconnect();
socket = null;
pusherSocketID = '';
eventsBoundToChannels.clear();
initPromise = new Promise((resolve) => {
resolveInitPromise = resolve;
});
Expand Down
7 changes: 6 additions & 1 deletion src/libs/Pusher/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,18 @@ type PusherEventName = LiteralUnion<DeepValueOf<typeof TYPE>, string>;

type PusherSubscriptionErrorData = {type?: string; error?: string; status?: string};

type PusherSubscription = Promise<void> & {
unsubscribe: () => void;
};

type PusherModule = {
init: (args: Args) => Promise<void>;
subscribe: <EventName extends PusherEventName>(
channelName: string,
eventName?: EventName,
eventCallback?: (data: EventData<EventName>) => void,
onResubscribe?: () => void,
) => Promise<void>;
) => PusherSubscription;
unsubscribe: (channelName: string, eventName?: PusherEventName) => void;
getChannel: (channelName: string) => Channel | PusherChannel | undefined;
isSubscribed: (channelName: string) => boolean;
Expand Down Expand Up @@ -105,5 +109,6 @@ export type {
SocketEventCallback,
PusherWithAuthParams,
PusherEventName,
PusherSubscription,
PusherSubscriptionErrorData,
};
Loading
Loading