From 56f2c5e387938c39efdac1b382768785be16139a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 4 May 2026 13:03:26 +0200 Subject: [PATCH 01/12] chore: bugfix for two way no merge and better error handling when merging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../jobs/integrationResultsReporting.job.ts | 29 +++++++++++++------ .../src/service/activity.service.ts | 20 +++++++++++++ .../src/service/member.service.ts | 3 +- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts b/services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts index 374273868b..ec68a769b5 100644 --- a/services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts +++ b/services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts @@ -62,23 +62,34 @@ const job: IJobDefinition = { ) ).count - // Break down errors by errorMessage + location, enriched with platform info + // Break down errors by errorMessage + location, enriched with platform info. + // When a mergeError is present in metadata, prefer its errorMessage for grouping + // so merge crashes surface as distinct groups rather than collapsing into the + // generic outer errorMessage. const errorGroups = await dbConnection.any( ` SELECT - COALESCE(r.error->>'errorMessage', '[no errorMessage]') AS "errorMessage", - COALESCE(r.error->>'location', '[no location]') AS location, - count(*)::int AS count, - round(avg(r.retries), 1)::float AS "avgRetries", - max(r.retries)::int AS "maxRetries", - min(r."createdAt") AS oldest, - max(r."updatedAt") AS newest, + COALESCE( + r.error->'metadata'->>'errorMessage', + r.error->>'errorMessage', + '[no errorMessage]' + ) AS "errorMessage", + COALESCE(r.error->>'location', '[no location]') AS location, + count(*)::int AS count, + round(avg(r.retries), 1)::float AS "avgRetries", + max(r.retries)::int AS "maxRetries", + min(r."createdAt") AS oldest, + max(r."updatedAt") AS newest, string_agg(DISTINCT i.platform, ', ' ORDER BY i.platform) AS platforms FROM integration.results r LEFT JOIN integrations i ON i.id = r."integrationId" WHERE r.state = 'error' GROUP BY - r.error->>'errorMessage', + COALESCE( + r.error->'metadata'->>'errorMessage', + r.error->>'errorMessage', + '[no errorMessage]' + ), r.error->>'location' ORDER BY count DESC LIMIT 20 diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 1ca2d5a4c1..b2ff1645c7 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1768,6 +1768,20 @@ export default class ActivityService extends LoggerBase { } } + if ( + metadata.memberWithIdentity && + metadata.memberIdToUpdate && + metadata.memberWithIdentity === metadata.memberIdToUpdate + ) { + // The member already owns the conflicting identity — stale prefetch race. + // The identity is already present so treat this as a no-op success. + this.log.warn( + { memberId: metadata.memberIdToUpdate, identity: metadata.erroredVerifiedIdentity }, + 'Verified identity already belongs to this member (stale prefetch) — treating as success', + ) + return metadata.memberIdToUpdate as string + } + if ( metadata.memberWithIdentity && metadata.memberIdToUpdate && @@ -1789,6 +1803,7 @@ export default class ActivityService extends LoggerBase { return originalId } else { metadata.noMerge = true + metadata.errorMessage = 'noMerge blocked — verified identity conflict' } } catch (err) { metadata.mergeError = { @@ -1796,9 +1811,14 @@ export default class ActivityService extends LoggerBase { errorStack: err?.stack, err, } + metadata.errorMessage = 'merge failed — auto-merge threw an error' } } + if (!metadata.errorMessage) { + metadata.errorMessage = 'verified identity conflict — identity owner not found' + } + return metadata } diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 63ad00480b..ea6d2c228b 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -84,8 +84,7 @@ export async function mergeIfAllowed( secondaryId: string, ): Promise { const noMergeMemberIds = await getMemberNoMerge(pgQx, [primaryId, secondaryId]) - const noMerge = singleOrDefault( - noMergeMemberIds, + const noMerge = noMergeMemberIds.some( (m) => (m.memberId === primaryId && m.noMergeId === secondaryId) || (m.memberId === secondaryId && m.noMergeId === primaryId), From 29bcdda69c161644ea5f3516ef312908115ea797 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 5 May 2026 08:20:20 +0200 Subject: [PATCH 02/12] chore: small refactor to cover what happens after the member merge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../jobs/integrationResultsReporting.job.ts | 6 +- .../src/service/activity.service.ts | 14 +- .../src/service/member.service.ts | 281 +++++++----------- 3 files changed, 120 insertions(+), 181 deletions(-) diff --git a/services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts b/services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts index ec68a769b5..afa6951014 100644 --- a/services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts +++ b/services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts @@ -63,9 +63,9 @@ const job: IJobDefinition = { ).count // Break down errors by errorMessage + location, enriched with platform info. - // When a mergeError is present in metadata, prefer its errorMessage for grouping - // so merge crashes surface as distinct groups rather than collapsing into the - // generic outer errorMessage. + // Prefer metadata.errorMessage when set — the data sink worker writes specific + // values there (e.g. "noMerge blocked", "merge failed") so those surface as + // distinct groups rather than collapsing into the generic outer errorMessage. const errorGroups = await dbConnection.any( ` SELECT diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index b2ff1645c7..beccfec318 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1346,8 +1346,11 @@ export default class ActivityService extends LoggerBase { orgPromiseCache, payload.activity.timestamp, ) - .then(() => { - payload.memberId = payload.dbMember.id + .then((redirectId?: string) => { + payload.memberId = redirectId ?? payload.dbMember.id + if (redirectId) { + memberMap.set(key, redirectId) + } }) .catch(async (err) => { const result = await this.handleMemberIdentityError( @@ -1405,8 +1408,11 @@ export default class ActivityService extends LoggerBase { orgPromiseCache, payload.activity.timestamp, ) - .then(() => { - payload.objectMemberId = payload.dbObjectMember.id + .then((redirectId?: string) => { + payload.objectMemberId = redirectId ?? payload.dbObjectMember.id + if (redirectId) { + memberMap.set(key, redirectId) + } }) .catch(async (err) => { const result = await this.handleMemberIdentityError( diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index ea6d2c228b..15daf5d603 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -23,7 +23,6 @@ import { findIdentitiesForMembers, findMemberIdByVerifiedIdentity, findMembersByVerifiedUsernames, - moveToNewMember, } from '@crowd/data-access-layer' import { DbStore } from '@crowd/data-access-layer/src/database' import { getMemberNoMerge } from '@crowd/data-access-layer/src/member_merge' @@ -118,6 +117,73 @@ export default class MemberService extends LoggerBase { this.pgQx = dbStoreQx(this.store) } + /** + * Inserts identities with failOnConflict=true. On a unique constraint violation: finds the + * owner and, when attemptMerge=true (update path), attempts mergeIfAllowed. On merge success + * returns the owner's memberId as a redirect signal. On failure re-throws the original + * DatabaseError so handleMemberIdentityError in activity.service.ts can extract metadata and + * write to integration.results.error. When attemptMerge=false (create path) the orphan has no + * data (INSERT was atomic — nothing committed), so we skip the full merge and just return the + * owner; the caller handles orphan cleanup via scheduleOrphanMemberDeletion. + */ + private async insertIdentitiesWithConflictResolution( + memberId: string, + integrationId: string, + identities: IMemberIdentity[], + attemptMerge = true, + ): Promise { + try { + await this.memberRepo.insertIdentities(memberId, integrationId, identities, true) + } catch (err) { + if ( + !err?.constraint || + err.constraint !== 'uix_memberIdentities_platform_value_type_verified' || + !err.detail + ) { + throw err + } + const match = (err.detail as string).match(/\(platform, value, type\)=\((.*?)\)/) + if (!match) throw err + const [platform, value, type] = match[1].split(',').map((s: string) => s.trim()) + + const owner = await findMemberIdByVerifiedIdentity( + this.pgQx, + platform, + value, + type as MemberIdentityType, + ) + + if (owner && owner !== memberId) { + if (!attemptMerge) { + return owner + } + + let merged: boolean + try { + merged = await mergeIfAllowed(this.pgQx, this.temporal, this.log, owner, memberId) + } catch (mergeErr) { + // Re-throw the original constraint error, not the merge error. If we let the merge + // error propagate, handleMemberIdentityError's checkForIdentityConstraint would + // return false and no metadata would be stored in integration.results.error. + // Re-throwing the constraint error lets handleMemberIdentityError retry the merge. + this.log.warn( + mergeErr, + { memberId, owner, platform, value, type }, + 'merge threw during identity conflict — re-throwing constraint error for retry', + ) + throw err + } + if (merged) return owner + } + + // noMerge, owner not found, or stale prefetch (owner === memberId): + // Re-throw original constraint DatabaseError. handleMemberIdentityError will call + // mergeIfAllowed again — for noMerge this is idempotent (one extra getMemberNoMerge + // query); for merge errors it provides a natural retry. + throw err + } + } + public async create( segmentIds: string[], integrationId: string, @@ -205,12 +271,18 @@ export default class MemberService extends LoggerBase { 'memberService -> create -> createMember', ) - let insertedCount: number + let redirectId: string | void try { - insertedCount = await logExecutionTimeV2( - () => this.memberRepo.insertIdentities(id, integrationId, data.identities), + redirectId = await logExecutionTimeV2( + () => + this.insertIdentitiesWithConflictResolution( + id, + integrationId, + data.identities, + false, + ), this.log, - 'memberService -> create -> insertIdentities', + 'memberService -> create -> insertIdentitiesWithConflictResolution', ) } catch (err) { this.log.error(err, { memberId: id }, 'Error inserting member identities!') @@ -222,157 +294,17 @@ export default class MemberService extends LoggerBase { throw err } - if (insertedCount < data.identities.length) { - // At least one verified identity conflicted. Walk every verified identity to: - // (a) find the existing member(s) that own the conflicting ones, and - // (b) collect identities that were successfully inserted for the orphan. - let existingMemberId: string | null = null - const orphanVerifiedIdentities: IMemberIdentity[] = [] - - for (const identity of data.identities.filter((i) => i.verified)) { - const owner = await findMemberIdByVerifiedIdentity( - this.pgQx, - identity.platform, - identity.value, - identity.type, - ) - - if (!owner) { - // The identity disappeared between INSERT and SELECT — unusual race condition. - // Cannot safely resolve; schedule orphan deletion and throw. - this.log.error( - { orphanMemberId: id, identity }, - 'Verified identity not found after conflict detection — scheduling orphan deletion', - ) - await this.scheduleOrphanMemberDeletion(id) - throw new ApplicationError( - `Identity conflict during member creation: owner not found for identity (${identity.platform}, ${identity.value}, ${identity.type})`, - ) - } else if (owner === id) { - // Successfully inserted for the orphan — will be moved to the existing member below - orphanVerifiedIdentities.push(identity) - } else if (!existingMemberId) { - // First conflicting owner found - existingMemberId = owner - } else if (existingMemberId !== owner) { - // A second conflicting owner — two existing members each own a different verified - // identity of this incoming member, so the data source asserts they are the same - // person. Auto-merge the second into the first. - this.log.warn( - { - orphanMemberId: id, - primaryMemberId: existingMemberId, - secondaryMemberId: owner, - identity, - }, - 'Multiple conflicting verified identities belong to different existing members — merging automatically', - ) - let merged: boolean - try { - merged = await mergeIfAllowed( - this.pgQx, - this.temporal, - this.log, - existingMemberId, - owner, - ) - } catch (mergeErr) { - this.log.error( - mergeErr, - { - orphanMemberId: id, - primaryMemberId: existingMemberId, - secondaryMemberId: owner, - }, - 'Auto-merge of conflicting members failed — scheduling orphan deletion', - ) - await this.scheduleOrphanMemberDeletion(id) - throw new ApplicationError( - `Identity conflict during member creation: auto-merge of members ${existingMemberId} and ${owner} failed for identity (${identity.platform}, ${identity.value}, ${identity.type})`, - ) - } - if (!merged) { - this.log.error( - { - orphanMemberId: id, - primaryMemberId: existingMemberId, - secondaryMemberId: owner, - }, - 'Auto-merge prevented by noMerge record — scheduling orphan deletion', - ) - await this.scheduleOrphanMemberDeletion(id) - throw new ApplicationError( - `Identity conflict during member creation: members ${existingMemberId} and ${owner} are marked as no-merge but share identity (${identity.platform}, ${identity.value}, ${identity.type})`, - ) - } - // existingMemberId (primary) survives; owner (secondary) was absorbed - this.log.info( - { - orphanMemberId: id, - survivingMemberId: existingMemberId, - mergedMemberId: owner, - identity, - }, - 'Auto-merge of conflicting members succeeded', - ) - } - // else: owner === existingMemberId — same member owns this identity too, nothing to do - } - - if (existingMemberId) { - // Move any verified identities that were inserted for the orphan to the existing - // member so they are not lost when the orphan is cascade-deleted. - // UPDATE memberId rather than INSERT to avoid unique constraint violations. - for (const identity of orphanVerifiedIdentities) { - try { - await moveToNewMember(this.pgQx, { - oldMemberId: id, - newMemberId: existingMemberId, - platform: identity.platform, - value: identity.value, - type: identity.type, - }) - } catch (moveErr) { - this.log.error( - moveErr, - { orphanMemberId: id, existingMemberId, identity }, - 'Failed to move orphan verified identity to existing member — scheduling orphan deletion', - ) - await this.scheduleOrphanMemberDeletion(id) - throw new ApplicationError( - `Failed to move identity (${identity.platform}, ${identity.value}, ${identity.type}) from orphan ${id} to existing member ${existingMemberId}`, - ) - } - } - this.log.warn( - { - orphanMemberId: id, - existingMemberId, - transferredIdentities: orphanVerifiedIdentities.length, - }, - 'Identity conflict during member creation — reusing existing member, scheduling orphan deletion', - ) - await logExecutionTimeV2( - () => this.memberRepo.addToSegments(existingMemberId, segmentIds), - this.log, - 'memberService -> create -> addToSegments (conflict path)', - ) - if (releaseMemberLock) { - await releaseMemberLock() - } - await this.scheduleOrphanMemberDeletion(id) - return existingMemberId - } - - // insertedCount < data.identities.length but no conflicting owner found — unexpected - this.log.error( - { memberId: id }, - 'Identity conflict during member creation but existing member not found — scheduling orphan deletion', + if (redirectId) { + await logExecutionTimeV2( + () => this.memberRepo.addToSegments(redirectId, segmentIds), + this.log, + 'memberService -> create -> addToSegments (conflict redirect)', ) + if (releaseMemberLock) { + await releaseMemberLock() + } await this.scheduleOrphanMemberDeletion(id) - throw new ApplicationError( - `Identity conflict during member creation for member ${id}: inserted ${insertedCount} of ${data.identities.length} identities but found no conflicting owner`, - ) + return redirectId } try { @@ -513,8 +445,8 @@ export default class MemberService extends LoggerBase { releaseMemberLock?: () => Promise, orgPromiseCache?: Map>, activityTimestamp?: string, - ): Promise { - await logExecutionTimeV2( + ): Promise { + return logExecutionTimeV2( async () => { this.log.trace({ memberId: id }, 'Updating a member!') @@ -600,23 +532,22 @@ export default class MemberService extends LoggerBase { ) } + let effectiveMemberId = id + if (identitiesToCreate) { this.log.trace({ memberId: id }, 'Inserting new identities!') - try { - await logExecutionTimeV2( - () => this.memberRepo.insertIdentities(id, integrationId, identitiesToCreate, true), - this.log, - 'memberService -> update -> insertIdentities', - ) - } catch (err) { - throw new ApplicationError( - 'Error while inserting member identities for an existing member!', - err, - ) + const redirectId = await logExecutionTimeV2( + () => + this.insertIdentitiesWithConflictResolution(id, integrationId, identitiesToCreate), + this.log, + 'memberService -> update -> insertIdentitiesWithConflictResolution', + ) + if (redirectId) { + effectiveMemberId = redirectId } } - if (identitiesToUpdate) { + if (effectiveMemberId === id && identitiesToUpdate) { this.log.trace({ memberId: id }, 'Updating identities!') try { await logExecutionTimeV2( @@ -638,7 +569,7 @@ export default class MemberService extends LoggerBase { if (this.botDetectionService.isFlaggedAsBot(toUpdate.attributes)) { this.log.debug({ memberId: id }, 'Skipping organization creation for bot member') - return + return effectiveMemberId !== id ? effectiveMemberId : undefined } const organizations = [] @@ -690,7 +621,7 @@ export default class MemberService extends LoggerBase { integrationId, emailIdentities.map((i) => i.value), orgPromiseCache, - id, + effectiveMemberId, activityTimestamp, ), this.log, @@ -704,14 +635,14 @@ export default class MemberService extends LoggerBase { if (organizations.length > 0) { const uniqOrgs = uniqby(organizations, 'id') - this.log.trace({ memberId: id }, 'Finding member organizations!') + this.log.trace({ memberId: effectiveMemberId }, 'Finding member organizations!') const orgsToAdd = ( await Promise.all( uniqOrgs.map(async (org) => { // Check if the org was already added to the member in the past, including deleted ones. // If it was, we ignore this org to prevent from adding it again. const existingMemberOrgs = await logExecutionTimeV2( - () => orgService.findMemberOrganizations(id, org.id), + () => orgService.findMemberOrganizations(effectiveMemberId, org.id), this.log, 'memberService -> update -> findMemberOrganizations', ) @@ -721,14 +652,16 @@ export default class MemberService extends LoggerBase { ).filter((org) => org !== null) if (orgsToAdd.length > 0) { - this.log.trace({ memberId: id }, 'Adding organizations to member!') + this.log.trace({ memberId: effectiveMemberId }, 'Adding organizations to member!') await logExecutionTimeV2( - () => orgService.addToMember([segmentId], id, orgsToAdd), + () => orgService.addToMember([segmentId], effectiveMemberId, orgsToAdd), this.log, 'memberService -> update -> addToMember', ) } } + + return effectiveMemberId !== id ? effectiveMemberId : undefined } catch (err) { this.log.error(err, { memberId: id }, 'Error while updating a member!') throw err From 990dca51ea8da2be50bdd1067780b0e4ad886c0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 5 May 2026 09:39:17 +0200 Subject: [PATCH 03/12] chore: try to insert identities and merge until we are done MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/service/activity.service.ts | 14 +++- .../src/service/member.service.ts | 84 +++++++++++++++---- 2 files changed, 78 insertions(+), 20 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index beccfec318..70084233fa 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1196,7 +1196,6 @@ export default class ActivityService extends LoggerBase { reach: value.member.reach, }, value.platform, - undefined, orgPromiseCache, value.timestamp, ) @@ -1342,7 +1341,6 @@ export default class ActivityService extends LoggerBase { payload.dbMember, dbMemberIdentities.get(payload.dbMember.id), payload.platform, - undefined, orgPromiseCache, payload.activity.timestamp, ) @@ -1404,7 +1402,6 @@ export default class ActivityService extends LoggerBase { payload.dbObjectMember, dbMemberIdentities.get(payload.dbObjectMember.id), payload.platform, - undefined, orgPromiseCache, payload.activity.timestamp, ) @@ -1828,6 +1825,17 @@ export default class ActivityService extends LoggerBase { return metadata } + if (error instanceof ApplicationError && error.metadata?.mergeCount !== undefined) { + return { + ...error.metadata, + errorMessage: error.message, + memberType, + memberIdToUpdate: dbMember?.id, + memberSource: + memberType === 'member' ? payload.dbMemberSource : payload.dbObjectMemberSource, + } + } + if (error instanceof ApplicationError) { let nextError: any = error.originalError diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 15daf5d603..99577c7ba4 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -184,12 +184,65 @@ export default class MemberService extends LoggerBase { } } + /** + * After an initial identity conflict redirects to a surviving member, inserts all incoming + * identities that the surviving member doesn't already have. If a further conflict triggers + * another merge the loop follows the new survivor. Throws ApplicationError once maxMerges + * is reached so the error surfaces in integration.results. + */ + private async syncIdentitiesAfterRedirect( + survivingId: string, + integrationId: string, + incomingIdentities: IMemberIdentity[], + maxMerges = 2, + ): Promise { + for (let mergeCount = 0; ; mergeCount++) { + const freshMap = await findIdentitiesForMembers(this.pgQx, [survivingId]) + const freshIdentities = freshMap.get(survivingId) ?? [] + + const toInsert = incomingIdentities.filter( + (incoming) => + !freshIdentities.some( + (existing) => + existing.platform === incoming.platform && + existing.value === incoming.value && + existing.type === incoming.type && + existing.verified === incoming.verified, + ), + ) + + if (toInsert.length === 0) { + return survivingId + } + + const nextRedirectId = await this.insertIdentitiesWithConflictResolution( + survivingId, + integrationId, + toInsert, + true, + ) + + if (!nextRedirectId) { + return survivingId + } + + if (mergeCount >= maxMerges) { + throw new ApplicationError('identity sync exceeded merge limit', undefined, { + mergeCount: mergeCount + 1, + survivingId, + maxMerges, + }) + } + + survivingId = nextRedirectId + } + } + public async create( segmentIds: string[], integrationId: string, data: IMemberCreateData, platform: PlatformType, - releaseMemberLock?: () => Promise, orgPromiseCache?: Map>, activityTimestamp?: string, ): Promise { @@ -295,16 +348,18 @@ export default class MemberService extends LoggerBase { } if (redirectId) { + await this.scheduleOrphanMemberDeletion(id) + const finalMemberId = await this.syncIdentitiesAfterRedirect( + redirectId, + integrationId, + data.identities, + ) await logExecutionTimeV2( - () => this.memberRepo.addToSegments(redirectId, segmentIds), + () => this.memberRepo.addToSegments(finalMemberId, segmentIds), this.log, 'memberService -> create -> addToSegments (conflict redirect)', ) - if (releaseMemberLock) { - await releaseMemberLock() - } - await this.scheduleOrphanMemberDeletion(id) - return redirectId + return finalMemberId } try { @@ -323,10 +378,6 @@ export default class MemberService extends LoggerBase { throw err } - if (releaseMemberLock) { - await releaseMemberLock() - } - // we should prevent organization creation for bot members if (botDetection === MemberBotDetection.CONFIRMED_BOT) { this.log.debug('Skipping organization creation for bot member') @@ -442,7 +493,6 @@ export default class MemberService extends LoggerBase { original: IDbMember, originalIdentities: IMemberIdentity[], platform: PlatformType, - releaseMemberLock?: () => Promise, orgPromiseCache?: Map>, activityTimestamp?: string, ): Promise { @@ -543,7 +593,11 @@ export default class MemberService extends LoggerBase { 'memberService -> update -> insertIdentitiesWithConflictResolution', ) if (redirectId) { - effectiveMemberId = redirectId + effectiveMemberId = await this.syncIdentitiesAfterRedirect( + redirectId, + integrationId, + identitiesToCreate, + ) } } @@ -563,10 +617,6 @@ export default class MemberService extends LoggerBase { } } - if (releaseMemberLock) { - await releaseMemberLock() - } - if (this.botDetectionService.isFlaggedAsBot(toUpdate.attributes)) { this.log.debug({ memberId: id }, 'Skipping organization creation for bot member') return effectiveMemberId !== id ? effectiveMemberId : undefined From 01d19fbfdf0230c50ea8d8e696496c5abc15e27a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 5 May 2026 10:29:35 +0200 Subject: [PATCH 04/12] fix: max merge check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- services/apps/data_sink_worker/src/service/member.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 99577c7ba4..b3ee3fd209 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -226,7 +226,7 @@ export default class MemberService extends LoggerBase { return survivingId } - if (mergeCount >= maxMerges) { + if (mergeCount + 1 >= maxMerges) { throw new ApplicationError('identity sync exceeded merge limit', undefined, { mergeCount: mergeCount + 1, survivingId, From 82a4065905a4d55eb6456755a55906973518f12a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 5 May 2026 11:10:50 +0200 Subject: [PATCH 05/12] fix: fixed issue with how we handle identities after merge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/service/activity.service.ts | 19 ++++++ .../src/service/member.service.ts | 59 +++++++++++++++---- 2 files changed, 67 insertions(+), 11 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 70084233fa..36e7da2bf0 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1836,6 +1836,25 @@ export default class ActivityService extends LoggerBase { } } + // syncIdentitiesAfterRedirect wraps constraint errors with the current survivingId when the + // original member has already been absorbed by a prior merge. Unwrap and re-handle using a + // synthetic dbMember with the surviving ID so mergeIfAllowed targets the right member. + if ( + error instanceof ApplicationError && + error.metadata?.survivingId !== undefined && + error.originalError + ) { + const survivingDbMember = dbMember + ? { ...dbMember, id: error.metadata.survivingId as string } + : undefined + return this.handleMemberIdentityError( + error.originalError, + payload, + memberType, + survivingDbMember, + ) + } + if (error instanceof ApplicationError) { let nextError: any = error.originalError diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index b3ee3fd209..114f193304 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -191,36 +191,73 @@ export default class MemberService extends LoggerBase { * is reached so the error surfaces in integration.results. */ private async syncIdentitiesAfterRedirect( - survivingId: string, + initialSurvivingId: string, integrationId: string, incomingIdentities: IMemberIdentity[], maxMerges = 2, ): Promise { + let survivingId = initialSurvivingId + for (let mergeCount = 0; ; mergeCount++) { const freshMap = await findIdentitiesForMembers(this.pgQx, [survivingId]) const freshIdentities = freshMap.get(survivingId) ?? [] + // Match on (platform, value, type) only — per-member unique index excludes verified. + // Inserting an identity that already exists with a different verified flag would hit + // uix_memberIdentities_memberId_platform_value_type. const toInsert = incomingIdentities.filter( (incoming) => !freshIdentities.some( + (existing) => + existing.platform === incoming.platform && + existing.value === incoming.value && + existing.type === incoming.type, + ), + ) + + // Promote verified=false → true for identities already on the surviving member + // where the incoming payload asserts verified=true. + const toVerify = incomingIdentities.filter( + (incoming) => + incoming.verified && + freshIdentities.some( (existing) => existing.platform === incoming.platform && existing.value === incoming.value && existing.type === incoming.type && - existing.verified === incoming.verified, + !existing.verified, ), ) + if (toVerify.length > 0) { + await this.memberRepo.updateIdentities(survivingId, toVerify) + } + if (toInsert.length === 0) { return survivingId } - const nextRedirectId = await this.insertIdentitiesWithConflictResolution( - survivingId, - integrationId, - toInsert, - true, - ) + let nextRedirectId: string | void + try { + nextRedirectId = await this.insertIdentitiesWithConflictResolution( + survivingId, + integrationId, + toInsert, + true, + ) + } catch (err) { + // If we've already followed at least one redirect the original member id is stale. + // Wrap the error so handleMemberIdentityError uses survivingId as the merge target + // instead of the already-absorbed original member. + if (survivingId !== initialSurvivingId) { + throw new ApplicationError( + 'identity conflict after redirect sync', + err instanceof Error ? err : undefined, + { survivingId }, + ) + } + throw err + } if (!nextRedirectId) { return survivingId @@ -601,11 +638,11 @@ export default class MemberService extends LoggerBase { } } - if (effectiveMemberId === id && identitiesToUpdate) { - this.log.trace({ memberId: id }, 'Updating identities!') + if (identitiesToUpdate) { + this.log.trace({ memberId: effectiveMemberId }, 'Updating identities!') try { await logExecutionTimeV2( - () => this.memberRepo.updateIdentities(id, identitiesToUpdate), + () => this.memberRepo.updateIdentities(effectiveMemberId, identitiesToUpdate), this.log, 'memberService -> update -> updateIdentities', ) From 71c6c8fdb4a77acc23d50eaa0e2e0992ad5339ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 5 May 2026 12:47:14 +0200 Subject: [PATCH 06/12] fix: no longer we depend on postgres error details to decode problematic identity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/service/activity.service.ts | 85 +++++++++---------- .../src/service/member.service.ts | 79 ++++++++++------- 2 files changed, 89 insertions(+), 75 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 36e7da2bf0..4f171308a4 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1708,56 +1708,55 @@ export default class ActivityService extends LoggerBase { return false } - const extractMetadata = async ( - error: any, - ): Promise | undefined> => { + const extractMetadata = async (): Promise | undefined> => { const metadata: Record = {} - // extract the platform, value, type from the detail - const detail = error.detail - const regex = /\(platform, value, type\)=\((.*?)\)/ - const match = detail.match(regex) + const incomingIdentities = + memberType === 'member' + ? payload.activity.member.identities + : payload.activity.objectMember.identities + const verifiedIncoming = incomingIdentities.filter((i) => i.verified) - if (!match || match.length < 2) { - return - } - - // Split the matched string by commas - const values = match[1].split(',').map((val) => val.trim()) + metadata.verifiedIdentities = verifiedIncoming - // Extract platform, value, and type - const [platform, value, type] = values - - metadata.erroredVerifiedIdentity = { - platform, - value, - type, + if (verifiedIncoming.length === 0) { + return undefined } - const membersWithIdentity = await findMembersByIdentities( - this.pgQx, - [ - { - platform, - value, - type, - verified: true, - } as IMemberIdentity, - ], - undefined, - true, - ) + // Use the structured identities array to find the owner — avoids fragile Postgres + // Detail text parsing (format not stable; breaks if value contains a comma). + const owners = await findMembersByIdentities(this.pgQx, verifiedIncoming, undefined, true) + + // Map keys are `${platform}:${type}:${value}` (from db rows). Match case-insensitively. + let conflictIdentity: IMemberIdentity | undefined + let ownerId: string | undefined + outer: for (const id of verifiedIncoming) { + for (const [key, oid] of owners) { + const sep1 = key.indexOf(':') + const sep2 = key.indexOf(':', sep1 + 1) + if (sep1 < 0 || sep2 < 0) continue + if ( + key.slice(0, sep1) === id.platform && + key.slice(sep1 + 1, sep2) === id.type && + key.slice(sep2 + 1).toLowerCase() === id.value.toLowerCase() + ) { + conflictIdentity = id + ownerId = oid + break outer + } + } + } - if (memberType === 'member') { - metadata.verifiedIdentities = payload.activity.member.identities.filter((i) => i.verified) - } else { - metadata.verifiedIdentities = payload.activity.objectMember.identities.filter( - (i) => i.verified, - ) + if (conflictIdentity) { + metadata.erroredVerifiedIdentity = { + platform: conflictIdentity.platform, + value: conflictIdentity.value, + type: conflictIdentity.type, + } } - if (membersWithIdentity.size > 0) { - metadata.memberWithIdentity = membersWithIdentity.values().next().value + if (ownerId) { + metadata.memberWithIdentity = ownerId } if (dbMember) { @@ -1860,7 +1859,7 @@ export default class ActivityService extends LoggerBase { while (nextError) { if (checkForIdentityConstraint(nextError)) { - return extractMetadata(nextError) + return extractMetadata() } else if (nextError instanceof ApplicationError) { nextError = nextError.originalError } else { @@ -1868,7 +1867,7 @@ export default class ActivityService extends LoggerBase { } } } else if (checkForIdentityConstraint(error)) { - return extractMetadata(error) + return extractMetadata() } return undefined diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 114f193304..629456816a 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -21,7 +21,7 @@ import { import { QueryExecutor, createMember, dbStoreQx, updateMember } from '@crowd/data-access-layer' import { findIdentitiesForMembers, - findMemberIdByVerifiedIdentity, + findMembersByIdentities, findMembersByVerifiedUsernames, } from '@crowd/data-access-layer' import { DbStore } from '@crowd/data-access-layer/src/database' @@ -137,44 +137,59 @@ export default class MemberService extends LoggerBase { } catch (err) { if ( !err?.constraint || - err.constraint !== 'uix_memberIdentities_platform_value_type_verified' || - !err.detail + err.constraint !== 'uix_memberIdentities_platform_value_type_verified' ) { throw err } - const match = (err.detail as string).match(/\(platform, value, type\)=\((.*?)\)/) - if (!match) throw err - const [platform, value, type] = match[1].split(',').map((s: string) => s.trim()) - - const owner = await findMemberIdByVerifiedIdentity( - this.pgQx, - platform, - value, - type as MemberIdentityType, - ) - if (owner && owner !== memberId) { - if (!attemptMerge) { - return owner + const verifiedIncoming = identities.filter((i) => i.verified) + if (verifiedIncoming.length === 0) throw err + + // Use the structured identities array to find the owner — avoids fragile Postgres + // Detail text parsing (format not stable; breaks if value contains a comma). + const owners = await findMembersByIdentities(this.pgQx, verifiedIncoming, undefined, true) + + // Map keys are `${platform}:${type}:${value}` (from db rows). Match case-insensitively. + let owner: string | undefined + outer: for (const id of verifiedIncoming) { + for (const [key, ownerId] of owners) { + const sep1 = key.indexOf(':') + const sep2 = key.indexOf(':', sep1 + 1) + if (sep1 < 0 || sep2 < 0) continue + if ( + key.slice(0, sep1) === id.platform && + key.slice(sep1 + 1, sep2) === id.type && + key.slice(sep2 + 1).toLowerCase() === id.value.toLowerCase() && + ownerId !== memberId + ) { + owner = ownerId + break outer + } } + } - let merged: boolean - try { - merged = await mergeIfAllowed(this.pgQx, this.temporal, this.log, owner, memberId) - } catch (mergeErr) { - // Re-throw the original constraint error, not the merge error. If we let the merge - // error propagate, handleMemberIdentityError's checkForIdentityConstraint would - // return false and no metadata would be stored in integration.results.error. - // Re-throwing the constraint error lets handleMemberIdentityError retry the merge. - this.log.warn( - mergeErr, - { memberId, owner, platform, value, type }, - 'merge threw during identity conflict — re-throwing constraint error for retry', - ) - throw err - } - if (merged) return owner + if (!owner) throw err + + if (!attemptMerge) { + return owner + } + + let merged: boolean + try { + merged = await mergeIfAllowed(this.pgQx, this.temporal, this.log, owner, memberId) + } catch (mergeErr) { + // Re-throw the original constraint error, not the merge error. If we let the merge + // error propagate, handleMemberIdentityError's checkForIdentityConstraint would + // return false and no metadata would be stored in integration.results.error. + // Re-throwing the constraint error lets handleMemberIdentityError retry the merge. + this.log.warn( + mergeErr, + { memberId, owner }, + 'merge threw during identity conflict — re-throwing constraint error for retry', + ) + throw err } + if (merged) return owner // noMerge, owner not found, or stale prefetch (owner === memberId): // Re-throw original constraint DatabaseError. handleMemberIdentityError will call From 7c2b4c0ab427e66a2db752e3bc8aadfa1fb77a87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 5 May 2026 21:51:31 +0200 Subject: [PATCH 07/12] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/service/activity.service.ts | 40 ++++++++---- .../src/service/member.service.ts | 61 ++++++++++--------- 2 files changed, 60 insertions(+), 41 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 4f171308a4..98d3c4282a 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1312,16 +1312,27 @@ export default class ActivityService extends LoggerBase { dbMemberIdentities = await findIdentitiesForMembers(this.pgQx, Array.from(memberIds)) } + // Tracks merge redirects across payloads in this batch. When a member update redirects + // X→Y (merge), subsequent payloads for the same platform:username skip the update and + // reuse Y — avoids calling update() on an already-absorbed member row. + const memberMap = new Map() + for (const payload of relevantPayloads) { - // contains the merged member ids - const memberMap = new Map() + const memberKey = `${payload.platform}:${payload.activity.username}` + const objectMemberKey = `${payload.platform}:${payload.activity.objectMemberUsername}` + // When actor and objectActor are the same person, skip the objectMember update and + // copy memberId after Promise.all resolves. + const sameActorKey = !!( + payload.dbMember && + payload.dbObjectMember && + memberKey === objectMemberKey + ) const promises = [] // update members and orgs with them if (payload.dbMember) { - const key = `${payload.platform}:${payload.activity.username}` - if (memberMap.has(key)) { - payload.memberId = memberMap.get(key) + if (memberMap.has(memberKey)) { + payload.memberId = memberMap.get(memberKey) } else { promises.push( memberService @@ -1347,7 +1358,7 @@ export default class ActivityService extends LoggerBase { .then((redirectId?: string) => { payload.memberId = redirectId ?? payload.dbMember.id if (redirectId) { - memberMap.set(key, redirectId) + memberMap.set(memberKey, redirectId) } }) .catch(async (err) => { @@ -1360,7 +1371,7 @@ export default class ActivityService extends LoggerBase { if (result) { if (typeof result === 'string') { payload.memberId = result - memberMap.set(key, result) + memberMap.set(memberKey, result) } else { resultMap.set(payload.resultId, { success: false, @@ -1379,10 +1390,9 @@ export default class ActivityService extends LoggerBase { } } - if (payload.dbObjectMember) { - const key = `${payload.platform}:${payload.activity.objectMemberUsername}` - if (memberMap.has(key)) { - payload.objectMemberId = memberMap.get(key) + if (payload.dbObjectMember && !sameActorKey) { + if (memberMap.has(objectMemberKey)) { + payload.objectMemberId = memberMap.get(objectMemberKey) } else { promises.push( memberService @@ -1408,7 +1418,7 @@ export default class ActivityService extends LoggerBase { .then((redirectId?: string) => { payload.objectMemberId = redirectId ?? payload.dbObjectMember.id if (redirectId) { - memberMap.set(key, redirectId) + memberMap.set(objectMemberKey, redirectId) } }) .catch(async (err) => { @@ -1421,7 +1431,7 @@ export default class ActivityService extends LoggerBase { if (result) { if (typeof result === 'string') { payload.objectMemberId = result - memberMap.set(key, result) + memberMap.set(objectMemberKey, result) } else { resultMap.set(payload.resultId, { success: false, @@ -1442,6 +1452,10 @@ export default class ActivityService extends LoggerBase { await Promise.all(promises) + if (sameActorKey) { + payload.objectMemberId = payload.memberId + } + if (resultMap.has(payload.resultId)) { continue } diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 629456816a..c96987b32a 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -399,47 +399,52 @@ export default class MemberService extends LoggerBase { throw err } + let effectiveMemberId: string + if (redirectId) { await this.scheduleOrphanMemberDeletion(id) - const finalMemberId = await this.syncIdentitiesAfterRedirect( + effectiveMemberId = await this.syncIdentitiesAfterRedirect( redirectId, integrationId, data.identities, ) await logExecutionTimeV2( - () => this.memberRepo.addToSegments(finalMemberId, segmentIds), + () => this.memberRepo.addToSegments(effectiveMemberId, segmentIds), this.log, 'memberService -> create -> addToSegments (conflict redirect)', ) - return finalMemberId - } - - try { - await logExecutionTimeV2( - () => this.memberRepo.addToSegments(id, segmentIds), - this.log, - 'memberService -> create -> addToSegments', - ) - } catch (err) { - this.log.error(err, { memberId: id }, 'Error while adding member to segments!') - await logExecutionTimeV2( - async () => this.memberRepo.destroyMemberAfterError(id, true), - this.log, - 'memberService -> create -> destroyMemberAfterError', - ) - throw err + } else { + effectiveMemberId = id + try { + await logExecutionTimeV2( + () => this.memberRepo.addToSegments(id, segmentIds), + this.log, + 'memberService -> create -> addToSegments', + ) + } catch (err) { + this.log.error(err, { memberId: id }, 'Error while adding member to segments!') + await logExecutionTimeV2( + async () => this.memberRepo.destroyMemberAfterError(id, true), + this.log, + 'memberService -> create -> destroyMemberAfterError', + ) + throw err + } } // we should prevent organization creation for bot members if (botDetection === MemberBotDetection.CONFIRMED_BOT) { this.log.debug('Skipping organization creation for bot member') - return id + return effectiveMemberId } // trigger LLM validation if the member is suspected as a bot if (botDetection === MemberBotDetection.SUSPECTED_BOT) { - this.log.debug({ memberId: id }, 'Member suspected as bot. Triggering LLM validation.') - await this.startMemberBotAnalysisWithLLMWorkflow(id) + this.log.debug( + { memberId: effectiveMemberId }, + 'Member suspected as bot. Triggering LLM validation.', + ) + await this.startMemberBotAnalysisWithLLMWorkflow(effectiveMemberId) } const organizations = [] @@ -470,9 +475,9 @@ export default class MemberService extends LoggerBase { orgIdPromise.catch(() => orgPromiseCache?.delete(key)) } } - const id = await orgIdPromise + const orgId = await orgIdPromise organizations.push({ - id, + id: orgId, source: org.source, }) } @@ -488,7 +493,7 @@ export default class MemberService extends LoggerBase { integrationId, emailIdentities.map((i) => i.value), orgPromiseCache, - id, + effectiveMemberId, activityTimestamp, ), this.log, @@ -508,7 +513,7 @@ export default class MemberService extends LoggerBase { // Check if the org was already added to the member in the past, including deleted ones. // If it was, we ignore this org to prevent from adding it again. const existingMemberOrgs = await logExecutionTimeV2( - () => orgService.findMemberOrganizations(id, org.id), + () => orgService.findMemberOrganizations(effectiveMemberId, org.id), this.log, 'memberService -> create -> findMemberOrganizations', ) @@ -519,14 +524,14 @@ export default class MemberService extends LoggerBase { if (orgsToAdd.length > 0) { await logExecutionTimeV2( - () => orgService.addToMember(segmentIds, id, orgsToAdd), + () => orgService.addToMember(segmentIds, effectiveMemberId, orgsToAdd), this.log, 'memberService -> create -> addToMember', ) } } - return id + return effectiveMemberId } catch (err) { this.log.error(err, 'Error while creating a new member!') throw err From c19e09e28f5d29eefcb6a17114955d2403137296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 5 May 2026 23:29:17 +0200 Subject: [PATCH 08/12] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/service/activity.service.ts | 3 +-- .../data-access-layer/src/members/identities.ts | 14 +++++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 98d3c4282a..2395beb39a 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1713,8 +1713,7 @@ export default class ActivityService extends LoggerBase { error.constructor && error.constructor.name === 'DatabaseError' && error.constraint && - error.constraint === 'uix_memberIdentities_platform_value_type_verified' && - error.detail + error.constraint === 'uix_memberIdentities_platform_value_type_verified' ) { return true } diff --git a/services/libs/data-access-layer/src/members/identities.ts b/services/libs/data-access-layer/src/members/identities.ts index 94e019ebb4..a525128965 100644 --- a/services/libs/data-access-layer/src/members/identities.ts +++ b/services/libs/data-access-layer/src/members/identities.ts @@ -572,9 +572,13 @@ export async function findMembersByIdentities( conditions.push('mi.verified = true') } - const identityParams = identities - .map((identity) => `('${identity.platform}', '${identity.value}', '${identity.type}')`) - .join(', ') + const identityTuples = identities.map((identity, i) => { + params[`ip${i}`] = identity.platform + params[`iv${i}`] = identity.value + params[`it${i}`] = identity.type + return `($(ip${i}), $(iv${i}), $(it${i}))` + }) + const identityParams = identityTuples.join(', ') const result = await qx.select( ` @@ -583,8 +587,8 @@ export async function findMembersByIdentities( ) select "memberId", i.platform, i.value, i.type from "memberIdentities" mi - inner join input_identities i - on mi.platform = i.platform + inner join input_identities i + on mi.platform = i.platform and lower(mi.value) = lower(i.value) and mi.type = i.type and mi."deletedAt" is null From 684f9ed3c7463bd28ada9c3015ab19900936c3ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 5 May 2026 23:52:22 +0200 Subject: [PATCH 09/12] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- services/apps/data_sink_worker/src/service/activity.service.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 2395beb39a..7276dc298a 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1751,7 +1751,8 @@ export default class ActivityService extends LoggerBase { if ( key.slice(0, sep1) === id.platform && key.slice(sep1 + 1, sep2) === id.type && - key.slice(sep2 + 1).toLowerCase() === id.value.toLowerCase() + key.slice(sep2 + 1).toLowerCase() === id.value.toLowerCase() && + oid !== dbMember?.id ) { conflictIdentity = id ownerId = oid From 3918348f10ff78f0719c3f1ced1db41a099209c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Wed, 6 May 2026 00:16:35 +0200 Subject: [PATCH 10/12] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../apps/data_sink_worker/src/service/activity.service.ts | 7 ++++++- .../apps/data_sink_worker/src/service/member.service.ts | 4 ++-- services/libs/data-access-layer/src/members/identities.ts | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 7276dc298a..7a9925a2f0 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1333,6 +1333,7 @@ export default class ActivityService extends LoggerBase { if (payload.dbMember) { if (memberMap.has(memberKey)) { payload.memberId = memberMap.get(memberKey) + payload.dbMember = undefined } else { promises.push( memberService @@ -1393,6 +1394,7 @@ export default class ActivityService extends LoggerBase { if (payload.dbObjectMember && !sameActorKey) { if (memberMap.has(objectMemberKey)) { payload.objectMemberId = memberMap.get(objectMemberKey) + payload.dbObjectMember = undefined } else { promises.push( memberService @@ -1751,7 +1753,10 @@ export default class ActivityService extends LoggerBase { if ( key.slice(0, sep1) === id.platform && key.slice(sep1 + 1, sep2) === id.type && - key.slice(sep2 + 1).toLowerCase() === id.value.toLowerCase() && + key + .slice(sep2 + 1) + .trim() + .toLowerCase() === id.value.trim().toLowerCase() && oid !== dbMember?.id ) { conflictIdentity = id diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index c96987b32a..23b27cf9a7 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -225,7 +225,7 @@ export default class MemberService extends LoggerBase { !freshIdentities.some( (existing) => existing.platform === incoming.platform && - existing.value === incoming.value && + existing.value.trim().toLowerCase() === incoming.value.trim().toLowerCase() && existing.type === incoming.type, ), ) @@ -238,7 +238,7 @@ export default class MemberService extends LoggerBase { freshIdentities.some( (existing) => existing.platform === incoming.platform && - existing.value === incoming.value && + existing.value.trim().toLowerCase() === incoming.value.trim().toLowerCase() && existing.type === incoming.type && !existing.verified, ), diff --git a/services/libs/data-access-layer/src/members/identities.ts b/services/libs/data-access-layer/src/members/identities.ts index a525128965..69a7359130 100644 --- a/services/libs/data-access-layer/src/members/identities.ts +++ b/services/libs/data-access-layer/src/members/identities.ts @@ -574,7 +574,7 @@ export async function findMembersByIdentities( const identityTuples = identities.map((identity, i) => { params[`ip${i}`] = identity.platform - params[`iv${i}`] = identity.value + params[`iv${i}`] = identity.value.trim() params[`it${i}`] = identity.type return `($(ip${i}), $(iv${i}), $(it${i}))` }) From 69748ed53dcafa60f61f5d65d8510ed7c9fb3676 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Wed, 6 May 2026 07:52:36 +0200 Subject: [PATCH 11/12] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- services/apps/data_sink_worker/src/service/member.service.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 23b27cf9a7..35775a7e6a 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -159,7 +159,10 @@ export default class MemberService extends LoggerBase { if ( key.slice(0, sep1) === id.platform && key.slice(sep1 + 1, sep2) === id.type && - key.slice(sep2 + 1).toLowerCase() === id.value.toLowerCase() && + key + .slice(sep2 + 1) + .trim() + .toLowerCase() === id.value.trim().toLowerCase() && ownerId !== memberId ) { owner = ownerId From e5b9112d47d18858932a482c95421470ca3e3cd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Wed, 6 May 2026 08:51:19 +0200 Subject: [PATCH 12/12] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/service/activity.service.ts | 39 ++++++++++++++++++- .../src/service/member.service.ts | 17 +++++++- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 7a9925a2f0..849335367b 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1372,7 +1372,11 @@ export default class ActivityService extends LoggerBase { if (result) { if (typeof result === 'string') { payload.memberId = result - memberMap.set(memberKey, result) + // Only cache real redirects — stale-prefetch returns the member's own ID + // and the member still exists, so subsequent payloads must still call update(). + if (result !== payload.dbMember.id) { + memberMap.set(memberKey, result) + } } else { resultMap.set(payload.resultId, { success: false, @@ -1433,7 +1437,11 @@ export default class ActivityService extends LoggerBase { if (result) { if (typeof result === 'string') { payload.objectMemberId = result - memberMap.set(objectMemberKey, result) + // Only cache real redirects — stale-prefetch returns the member's own ID + // and the member still exists, so subsequent payloads must still call update(). + if (result !== payload.dbObjectMember.id) { + memberMap.set(objectMemberKey, result) + } } else { resultMap.set(payload.resultId, { success: false, @@ -1745,6 +1753,8 @@ export default class ActivityService extends LoggerBase { // Map keys are `${platform}:${type}:${value}` (from db rows). Match case-insensitively. let conflictIdentity: IMemberIdentity | undefined let ownerId: string | undefined + + // Pass 1: find an identity owned by a different member (real conflict). outer: for (const id of verifiedIncoming) { for (const [key, oid] of owners) { const sep1 = key.indexOf(':') @@ -1766,6 +1776,31 @@ export default class ActivityService extends LoggerBase { } } + // Pass 2: if no external conflict, check whether this member already owns the + // identity (stale-prefetch race). Re-uses the owners map — no extra DB query. + if (!ownerId && dbMember) { + selfCheck: for (const id of verifiedIncoming) { + for (const [key, oid] of owners) { + const sep1 = key.indexOf(':') + const sep2 = key.indexOf(':', sep1 + 1) + if (sep1 < 0 || sep2 < 0) continue + if ( + key.slice(0, sep1) === id.platform && + key.slice(sep1 + 1, sep2) === id.type && + key + .slice(sep2 + 1) + .trim() + .toLowerCase() === id.value.trim().toLowerCase() && + oid === dbMember.id + ) { + conflictIdentity = id + ownerId = oid + break selfCheck + } + } + } + } + if (conflictIdentity) { metadata.erroredVerifiedIdentity = { platform: conflictIdentity.platform, diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 35775a7e6a..74af51d9b0 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -132,8 +132,21 @@ export default class MemberService extends LoggerBase { identities: IMemberIdentity[], attemptMerge = true, ): Promise { + // Deduplicate by (platform, value, type) — prefer verified=true when the same + // identity appears with both flags. Prevents uix_memberIdentities_memberId_platform_value_type + // from firing when a payload contains the same identity twice with different verified values. + const seen = new Map() + for (const id of identities) { + const key = `${id.platform}:${id.type}:${id.value.trim().toLowerCase()}` + const existing = seen.get(key) + if (!existing || (!existing.verified && id.verified)) { + seen.set(key, id) + } + } + const deduped = Array.from(seen.values()) + try { - await this.memberRepo.insertIdentities(memberId, integrationId, identities, true) + await this.memberRepo.insertIdentities(memberId, integrationId, deduped, true) } catch (err) { if ( !err?.constraint || @@ -142,7 +155,7 @@ export default class MemberService extends LoggerBase { throw err } - const verifiedIncoming = identities.filter((i) => i.verified) + const verifiedIncoming = deduped.filter((i) => i.verified) if (verifiedIncoming.length === 0) throw err // Use the structured identities array to find the owner — avoids fragile Postgres