Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions src/libs/API/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ addMiddleware(SaveResponseInOnyx);
// FraudMonitoring - Tags the request with the appropriate Fraud Protection event.
addMiddleware(FraudMonitoring);

// Use timestamp-based IDs to avoid collisions between browser tabs.
// Each tab has its own JS context with its own counter, so a simple
// incrementing number would collide across tabs.
let requestIndex = Date.now();
let requestIndex = 0;

/**
* Prepare the request to be sent. Bind data together with request metadata and apply optimistic Onyx data.
Expand Down Expand Up @@ -125,13 +122,13 @@ function prepareRequest<TCommand extends ApiCommand, TKey extends OnyxKey>(
/**
* Process a prepared request according to its type.
*/
async function processRequest<TKey extends OnyxKey>(request: OnyxRequest<TKey>, type: ApiRequestType): Promise<void | Response<TKey>> {
function processRequest<TKey extends OnyxKey>(request: OnyxRequest<TKey>, type: ApiRequestType): Promise<void | Response<TKey>> {
Log.info('[API] Processing request', false, {command: request.command, type});
// Write commands can be saved and retried, so push it to the SequentialQueue
if (type === CONST.API_REQUEST_TYPE.WRITE) {
Log.info('[API] Write command. Pushing to SequentialQueue', false, {command: request.command});
await pushToSequentialQueue(request);
return;
pushToSequentialQueue(request);
return Promise.resolve();
}

// Read requests are processed right away, but don't return the response to the caller
Expand Down Expand Up @@ -167,7 +164,6 @@ function write<TCommand extends WriteCommand, TKey extends OnyxKey>(
): Promise<void | Response<TKey>> {
Log.info('[API] Called API write', false, {command, ...apiCommandParameters});
const request = prepareRequest(command, CONST.API_REQUEST_TYPE.WRITE, apiCommandParameters, onyxData, conflictResolver);

return processRequest(request, CONST.API_REQUEST_TYPE.WRITE);
}

Expand Down
28 changes: 11 additions & 17 deletions src/libs/Network/SequentialQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ onReconnection(flush);
// Flush the queue when the persisted requests are initialized
onPersistedRequestsInitialization(flush);

async function handleConflictActions<TKey extends OnyxKey>(conflictAction: ConflictData, newRequest: OnyxRequest<TKey>): Promise<void> {
function handleConflictActions<TKey extends OnyxKey>(conflictAction: ConflictData, newRequest: OnyxRequest<TKey>) {
Log.info('[SequentialQueue] handleConflictActions', false, {
conflictType: conflictAction.type,
newCommand: newRequest.command,
Expand All @@ -447,34 +447,34 @@ async function handleConflictActions<TKey extends OnyxKey>(conflictAction: Confl
Log.info('[SequentialQueue] Conflict resolution: PUSH', false, {
command: newRequest.command,
});
await savePersistedRequest(newRequest);
savePersistedRequest(newRequest);
} else if (conflictAction.type === 'replace') {
Log.info('[SequentialQueue] Conflict resolution: REPLACE', false, {
command: newRequest.command,
replaceIndex: conflictAction.index,
replacementRequest: conflictAction.request?.command ?? newRequest.command,
});
await updatePersistedRequest(conflictAction.index, conflictAction.request ?? (newRequest as AnyRequest));
updatePersistedRequest(conflictAction.index, conflictAction.request ?? (newRequest as AnyRequest));
} else if (conflictAction.type === 'delete') {
Log.info('[SequentialQueue] Conflict resolution: DELETE', false, {
command: newRequest.command,
deleteIndices: conflictAction.indices,
willPushNewRequest: conflictAction.pushNewRequest ?? false,
hasNextAction: !!conflictAction.nextAction,
});
await deletePersistedRequestsByIndices(conflictAction.indices);
deletePersistedRequestsByIndices(conflictAction.indices);
if (conflictAction.pushNewRequest) {
Log.info('[SequentialQueue] Pushing new request after delete', false, {
command: newRequest.command,
});
await savePersistedRequest(newRequest);
savePersistedRequest(newRequest);
}
if (conflictAction.nextAction) {
Log.info('[SequentialQueue] Processing next conflict action', false, {
command: newRequest.command,
nextActionType: conflictAction.nextAction.type,
});
await handleConflictActions(conflictAction.nextAction, newRequest);
handleConflictActions(conflictAction.nextAction, newRequest);
}
} else {
Log.info('[SequentialQueue] No action performed, request ignored', false, {
Expand All @@ -484,7 +484,7 @@ async function handleConflictActions<TKey extends OnyxKey>(conflictAction: Confl
}
}

function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void> {
function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>) {
const currentRequests = getAllPersistedRequests();
Log.info('[SequentialQueue] push() called', false, {
command: newRequest.command,
Expand All @@ -494,11 +494,6 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void
isSequentialQueueRunning,
});

// Save the request to the persisted queue. The in-memory update inside save()
// happens synchronously, so flush() below will see the new request immediately.
// The returned promise resolves when disk persistence completes.
let persistencePromise: Promise<void>;

if (newRequest.checkAndFixConflictingRequest) {
const requests = currentRequests;
Log.info('[SequentialQueue] Checking for conflicts', false, {
Expand All @@ -515,13 +510,13 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void
// don't try to serialize a function.
// eslint-disable-next-line no-param-reassign
delete newRequest.checkAndFixConflictingRequest;
persistencePromise = handleConflictActions(conflictAction, newRequest);
handleConflictActions(conflictAction, newRequest);
} else {
Log.info('[SequentialQueue] No conflict action. Adding request to Persisted Requests', false, {
command: newRequest.command,
});
// Add request to Persisted Requests so that it can be retried if it fails
persistencePromise = savePersistedRequest(newRequest);
savePersistedRequest(newRequest);
}

// If we are offline we don't need to trigger the queue to empty as it will happen when we come back online
Expand All @@ -530,7 +525,7 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void
command: newRequest.command,
queueLength: getAllPersistedRequests().length,
});
return persistencePromise;
return;
}

// If the queue is running this request will run once it has finished processing the current batch
Expand All @@ -544,14 +539,13 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void
});
flush(true);
});
return persistencePromise;
return;
}

Log.info('[SequentialQueue] Queue is not running. Flushing the queue.', false, {
command: newRequest.command,
});
flush(true);
return persistencePromise;
}

function getCurrentRequest(): Promise<void> {
Expand Down
88 changes: 18 additions & 70 deletions src/libs/actions/PersistedRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import {deepEqual} from 'fast-equals';
import type {OnyxKey} from 'react-native-onyx';
import Onyx from 'react-native-onyx';
import Log from '@libs/Log';
import CONST from '@src/CONST';
import ONYXKEYS from '@src/ONYXKEYS';
import type {Request} from '@src/types/onyx';
import type {AnyRequest} from '@src/types/onyx/Request';
Expand All @@ -11,10 +10,6 @@ let persistedRequests: AnyRequest[] = [];
let ongoingRequest: AnyRequest | null = null;
let pendingSaveOperations: AnyRequest[] = [];
let isInitialized = false;
// Tracks all requestIDs this tab has ever seen (from disk init, save(), or other tabs).
// Used to distinguish stale own-write callbacks (ignore) from new requests enqueued
// by other browser tabs (merge into memory).
const knownRequestIDs = new Set<number>();
let initializationCallback: () => void;
function triggerInitializationCallback() {
if (typeof initializationCallback !== 'function') {
Expand All @@ -31,39 +26,10 @@ function onInitialization(callbackFunction: () => void) {
Onyx.connectWithoutView({
key: ONYXKEYS.PERSISTED_REQUESTS,
callback: (val) => {
Log.info('[PersistedRequests] hit Onyx connect callback', false, {isValNullish: val == null, isInitialized});

// After initialization, in-memory is authoritative — ignore stale disk
// callbacks to prevent out-of-order Onyx.set() from overwriting the
// correct in-memory state (Bug #80759 Issue 4).
// Exception 1: Onyx.clear() fires callback with null — allow through.
// Exception 2: Other browser tabs can enqueue requests. We detect these
// by checking for requestIDs not in knownRequestIDs, and merge them in.
if (isInitialized && val != null) {
const newFromOtherTabs = val.filter((r) => r.requestID != null && !knownRequestIDs.has(r.requestID));
if (newFromOtherTabs.length > 0) {
Log.info('[PersistedRequests] Merging requests from other tabs', false, {
newCount: newFromOtherTabs.length,
newCommands: getCommands(newFromOtherTabs),
});
for (const r of newFromOtherTabs) {
knownRequestIDs.add(r.requestID ?? CONST.DEFAULT_NUMBER_ID);
}
persistedRequests = [...persistedRequests, ...newFromOtherTabs];
Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, persistedRequests);
}
return;
}

Log.info('[PersistedRequests] hit Onyx connect callback', false, {isValNullish: val == null});
const previousInMemoryRequests = [...persistedRequests];
const diskRequests = val ?? [];
persistedRequests = diskRequests;
for (const r of diskRequests) {
if (r.requestID == null) {
continue;
}
knownRequestIDs.add(r.requestID);
}

Log.info('[PersistedRequests] DISK vs MEMORY comparison', false, {
diskRequestsLength: diskRequests.length,
Expand All @@ -78,11 +44,6 @@ Onyx.connectWithoutView({
Log.info(`[PersistedRequests] Processing pending save operations, size: ${pendingSaveOperations.length}`, false, {
pendingCommands: getCommands(pendingSaveOperations),
});
for (const r of pendingSaveOperations) {
if (r.requestID != null) {
knownRequestIDs.add(r.requestID);
}
}
const requests = [...persistedRequests, ...pendingSaveOperations];
persistedRequests = requests;
Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests);
Expand Down Expand Up @@ -134,9 +95,6 @@ Onyx.connectWithoutView({
*/
function clear() {
ongoingRequest = null;
persistedRequests = [];
pendingSaveOperations = [];
knownRequestIDs.clear();
Onyx.set(ONYXKEYS.PERSISTED_ONGOING_REQUESTS, null);
return Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, []);
}
Expand All @@ -146,7 +104,7 @@ function getLength(): number {
return persistedRequests.length + (ongoingRequest ? 1 : 0);
}

function save<TKey extends OnyxKey>(requestToPersist: Request<TKey>): Promise<void> {
function save<TKey extends OnyxKey>(requestToPersist: Request<TKey>) {
Log.info('[PersistedRequests] Saving request to queue started', false, {
command: requestToPersist.command,
currentQueueLength: persistedRequests.length,
Expand All @@ -159,24 +117,21 @@ function save<TKey extends OnyxKey>(requestToPersist: Request<TKey>): Promise<vo
pendingSaveOperationsLength: pendingSaveOperations.length,
});
pendingSaveOperations.push(requestToPersist as AnyRequest);
return Promise.resolve();
return;
}

// If the command is not in the keepLastInstance array, add the new request as usual
const requests = [...persistedRequests, requestToPersist];
const previousLength = persistedRequests.length;
persistedRequests = requests as AnyRequest[];
if (requestToPersist.requestID != null) {
knownRequestIDs.add(requestToPersist.requestID);
}

Log.info('[PersistedRequests] Request added to memory, persisting to disk', false, {
command: requestToPersist.command,
previousQueueLength: previousLength,
newQueueLength: requests.length,
});

return Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests as AnyRequest[])
Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests as AnyRequest[])
.then(() => {
Log.info('[PersistedRequests] Request successfully persisted to disk', false, {
command: requestToPersist.command,
Expand Down Expand Up @@ -247,29 +202,26 @@ function endRequestAndRemoveFromQueue<TKey extends OnyxKey>(requestToRemove: Req
});
}

function deleteRequestsByIndices(indices: number[]): Promise<void> {
function deleteRequestsByIndices(indices: number[]) {
// Create a Set from the indices array for efficient lookup
const indicesSet = new Set(indices);

// Create a new array excluding elements at the specified indices
persistedRequests = persistedRequests.filter((_, index) => !indicesSet.has(index));

// Update the persisted requests in storage or state as necessary
return Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, persistedRequests).then(() => {
Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, persistedRequests).then(() => {
Log.info(`Multiple (${indices.length}) requests removed from the queue. Queue length is ${persistedRequests.length}`);
});
}

function update<TKey extends OnyxKey>(oldRequestIndex: number, newRequest: Request<TKey>): Promise<void> {
function update<TKey extends OnyxKey>(oldRequestIndex: number, newRequest: Request<TKey>) {
const requests = [...persistedRequests];
const oldRequest = requests.at(oldRequestIndex);
Log.info('[PersistedRequests] Updating a request', false, {oldRequest, newRequest, oldRequestIndex});
requests.splice(oldRequestIndex, 1, newRequest as AnyRequest);
persistedRequests = requests;
if (newRequest.requestID != null) {
knownRequestIDs.add(newRequest.requestID);
}
return Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests);
Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests);
}

function updateOngoingRequest<TKey extends OnyxKey>(newRequest: Request<TKey>) {
Expand Down Expand Up @@ -318,13 +270,16 @@ function processNextRequest(): AnyRequest | null {
newQueueLength: persistedRequests.length,
});

// Persist both the updated queue and the ongoing request to disk atomically.
// This ensures that if the app crashes mid-flight, the ongoing request is not
// lost (Bug #80759 Issue 3a) and the queue on disk matches memory (Issue 3c).
Onyx.multiSet({
[ONYXKEYS.PERSISTED_REQUESTS]: persistedRequests,
...(ongoingRequest ? {[ONYXKEYS.PERSISTED_ONGOING_REQUESTS]: ongoingRequest} : {}),
});
if (ongoingRequest && ongoingRequest.persistWhenOngoing) {
Log.info('[PersistedRequests] Persisting ongoingRequest to disk', false, {
command: ongoingRequest.command,
});
Onyx.set(ONYXKEYS.PERSISTED_ONGOING_REQUESTS, ongoingRequest);
} else {
Log.info('[PersistedRequests] NOT persisting ongoingRequest to disk (persistWhenOngoing=false)', false, {
command: ongoingRequest?.command ?? 'null',
});
}

return ongoingRequest;
}
Expand Down Expand Up @@ -359,13 +314,6 @@ function rollbackOngoingRequest() {
newQueueLength: persistedRequests.length,
ongoingRequestCleared: true,
});

// Persist both changes to disk so a crash after rollback doesn't lose
// the rolled-back request or leave a stale ongoingRequest on disk.
Onyx.multiSet({
[ONYXKEYS.PERSISTED_REQUESTS]: persistedRequests,
[ONYXKEYS.PERSISTED_ONGOING_REQUESTS]: null,
});
}

function getAll(): AnyRequest[] {
Expand Down
Loading
Loading