From 8a8da85a9dff41ca398b21532a11a727ec170ef5 Mon Sep 17 00:00:00 2001 From: Artem Savchenko Date: Thu, 4 Dec 2025 14:16:09 +0700 Subject: [PATCH 1/3] Fix duplicated gmail messages Signed-off-by: Artem Savchenko --- .../src/__tests__/gmailClient.test.ts | 438 +++++++++++++++++- services/gmail/pod-gmail/src/gmail.ts | 37 ++ 2 files changed, 473 insertions(+), 2 deletions(-) diff --git a/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts b/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts index ee1fff8320d..0846797f407 100644 --- a/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts +++ b/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts @@ -96,6 +96,9 @@ jest.mock('../tokens') jest.mock('../message/adapter') jest.mock('../message/sync') jest.mock('../message/attachments') +jest.mock('../message/v2/send', () => ({ + makeHTMLBodyV2: jest.fn().mockResolvedValue('encoded-html-body') +})) jest.mock('@hcengineering/server-core', () => ({ withContext: jest.fn().mockImplementation((name: string) => { return function (target: any, propertyKey: string, descriptor: PropertyDescriptor) { @@ -124,7 +127,8 @@ jest.mock('../utils', () => ({ deleteKey: jest.fn().mockImplementation(() => Promise.resolve(undefined)), listKeys: jest.fn() })), - getSpaceId: jest.fn().mockReturnValue('test-space-id') + getSpaceId: jest.fn().mockReturnValue('test-space-id'), + createGmailSearchQuery: jest.fn().mockReturnValue('after:2024-01-01 before:2024-01-02 from:test@example.com') })) // Mock gmail module @@ -144,9 +148,17 @@ jest.mock('@hcengineering/setting', () => ({ } })) +// Mock chat module +jest.mock('@hcengineering/chat', () => ({ + masterTag: { + Thread: 'chat.class.Thread' + } +})) + // Mock config jest.mock('../config', () => ({ - WATCH_TOPIC_NAME: 'test-topic' + WATCH_TOPIC_NAME: 'test-topic', + OutgoingSyncStartDate: new Date('2020-01-01') })) jest.mock('@hcengineering/account-client', () => ({ @@ -594,4 +606,426 @@ describe('GmailClient', () => { }) }) }) + + describe('Deduplication - createMessage (V1)', () => { + let client: GmailClient + let mockTxOperations: any + + beforeEach(async () => { + mockTxOperations = { + findAll: jest.fn().mockResolvedValue([]), + findOne: jest.fn().mockResolvedValue(undefined), + createDoc: jest.fn().mockResolvedValue({}), + update: jest.fn().mockResolvedValue({}), + remove: jest.fn().mockResolvedValue({}), + updateDoc: jest.fn().mockResolvedValue({}), + tx: jest.fn().mockResolvedValue({}) + } + + client = await GmailClient.create( + mockContext as any, + mockCredentials, + mockUser, + mockClient as any, + mockWorkspaceClient as any, + mockWorkspaceId, + mockStorageAdapter as any + ) + + // Replace the private TxOperations instance + Object.defineProperty(client, 'client', { + value: mockTxOperations, + writable: true + }) + + // Reset the gmail.messages.send mock + mockedGmailUsers.messages.send.mockClear() + }) + + it('should send message only once when called concurrently', async () => { + const newMessage = { + _id: 'test-message-id', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient@example.com', + subject: 'Test Subject', + content: '

Test content

' + } + + // Call createMessage twice concurrently + const promise1 = client.createMessage(newMessage as any) + const promise2 = client.createMessage(newMessage as any) + + await Promise.all([promise1, promise2]) + + // Should only send once + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + + // Should log that duplicate was skipped + expect(mockContext.info).toHaveBeenCalledWith( + 'Message already being processed, skipping duplicate', + expect.objectContaining({ + messageKey: 'v1-test-message-id' + }) + ) + }) + + it('should allow retry after first call completes', async () => { + const newMessage = { + _id: 'test-message-id', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient@example.com', + subject: 'Test Subject', + content: '

Test content

' + } + + // First call + await client.createMessage(newMessage as any) + + // Reset mocks to check second call + mockedGmailUsers.messages.send.mockClear() + mockTxOperations.updateDoc.mockClear() + + // Second call after first completes - should succeed since lock is released + await client.createMessage(newMessage as any) + + // Should send again + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + expect(mockTxOperations.updateDoc).toHaveBeenCalledWith('class.NewMessage', 'space', 'test-message-id', { + status: 'sent' + }) + }) + + it('should skip message if already marked as sent', async () => { + const sentMessage = { + _id: 'sent-message-id', + _class: 'class.NewMessage', + space: 'space', + status: 'sent', + from: mockSocialId._id, + to: 'recipient@example.com', + subject: 'Test Subject', + content: '

Test content

' + } + + await client.createMessage(sentMessage as any) + + // Should not send + expect(mockedGmailUsers.messages.send).not.toHaveBeenCalled() + }) + + it('should handle errors and still clean up lock', async () => { + const newMessage = { + _id: 'error-message-id', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient@example.com', + subject: 'Test Subject', + content: '

Test content

' + } + + // Make send fail + mockedGmailUsers.messages.send.mockRejectedValueOnce(new Error('Send failed')) + + await client.createMessage(newMessage as any) + + // Should update status to error + expect(mockTxOperations.updateDoc).toHaveBeenCalledWith('class.NewMessage', 'space', 'error-message-id', { + status: 'error', + error: expect.any(String) + }) + + // Should be able to retry after first call completes (lock is released in finally) + mockedGmailUsers.messages.send.mockResolvedValueOnce({}) + mockTxOperations.updateDoc.mockClear() + + await client.createMessage(newMessage as any) + + // Should update status to sent this time + expect(mockTxOperations.updateDoc).toHaveBeenCalledWith('class.NewMessage', 'space', 'error-message-id', { + status: 'sent' + }) + }) + + it('should handle multiple different messages concurrently', async () => { + const message1 = { + _id: 'message-1', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient1@example.com', + subject: 'Subject 1', + content: '

Content 1

' + } + + const message2 = { + _id: 'message-2', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient2@example.com', + subject: 'Subject 2', + content: '

Content 2

' + } + + // Send two different messages concurrently + await Promise.all([client.createMessage(message1 as any), client.createMessage(message2 as any)]) + + // Should send both messages + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(2) + }) + }) + + describe('Deduplication - handleNewMessage (V2)', () => { + let client: GmailClient + let mockTxOperations: any + + beforeEach(async () => { + // Don't use fake timers for V2 tests to avoid rate limiter issues + mockTxOperations = { + findAll: jest.fn().mockResolvedValue([]), + findOne: jest.fn().mockResolvedValue({ + _id: 'test-thread-id', + parent: 'test-channel-id' + }), + createDoc: jest.fn().mockResolvedValue({}), + update: jest.fn().mockResolvedValue({}), + remove: jest.fn().mockResolvedValue({}), + updateDoc: jest.fn().mockResolvedValue({}), + tx: jest.fn().mockResolvedValue({}), + getHierarchy: jest.fn().mockReturnValue({ + isDerived: jest.fn().mockReturnValue(false) + }) + } + + client = await GmailClient.create( + mockContext as any, + mockCredentials, + mockUser, + mockClient as any, + mockWorkspaceClient as any, + mockWorkspaceId, + mockStorageAdapter as any + ) + + // Replace the private TxOperations instance + Object.defineProperty(client, 'client', { + value: mockTxOperations, + writable: true + }) + + // Mock integration to be configured + Object.defineProperty(client, 'integration', { + value: { + _id: 'test-integration', + data: { + integrationVersion: 'v2', + spaceId: 'test-space-id' + } + }, + writable: true + }) + + // Mock the email property + Object.defineProperty(client, 'email', { + value: 'test@example.com', + writable: true + }) + + // Reset the gmail.messages.send mock + mockedGmailUsers.messages.send.mockClear() + mockedGmailUsers.messages.list.mockResolvedValue({ data: { messages: [] } }) + }) + + it('should send V2 message only once when called concurrently', async () => { + const messageEvent = { + _id: 'test-message-id', + messageId: 'msg-123', + cardId: 'card-456', + socialId: mockSocialId._id, + date: new Date('2024-01-01T12:00:00Z'), + content: 'Test message content' + } + + // Call handleNewMessage twice concurrently + const promise1 = client.handleNewMessage(messageEvent as any) + const promise2 = client.handleNewMessage(messageEvent as any) + + await Promise.all([promise1, promise2]) + + // Should only send once + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + + // Should log that duplicate was skipped + expect(mockContext.info).toHaveBeenCalledWith( + 'Message already being processed, skipping duplicate', + expect.objectContaining({ + messageKey: 'msg-123-card-456' + }) + ) + }) + + it('should use _id as fallback when messageId is not present', async () => { + const messageEvent = { + _id: 'test-message-id', + cardId: 'card-456', + socialId: mockSocialId._id, + date: new Date('2024-01-01T12:00:00Z'), + content: 'Test message content' + } + + // Call handleNewMessage twice concurrently + const promise1 = client.handleNewMessage(messageEvent as any) + const promise2 = client.handleNewMessage(messageEvent as any) + + await Promise.all([promise1, promise2]) + + // Should only send once + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + + // Should log with _id in messageKey + expect(mockContext.info).toHaveBeenCalledWith( + 'Message already being processed, skipping duplicate', + expect.objectContaining({ + messageKey: 'test-message-id-card-456' + }) + ) + }) + + it('should allow V2 retry after first call completes', async () => { + const messageEvent = { + _id: 'test-message-id', + messageId: 'msg-123', + cardId: 'card-456', + socialId: mockSocialId._id, + date: new Date('2024-01-01T12:00:00Z'), + content: 'Test message content' + } + + // First call + await client.handleNewMessage(messageEvent as any) + + // Reset mocks to check second call + mockedGmailUsers.messages.send.mockClear() + + // Second call after first completes - should succeed since lock is released + await client.handleNewMessage(messageEvent as any) + + // Should send again + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + }) + + it('should skip message if it belongs to different social ID', async () => { + const messageEvent = { + _id: 'test-message-id', + messageId: 'msg-123', + cardId: 'card-456', + socialId: 'different-social-id' as PersonId, + date: new Date('2024-01-01T12:00:00Z'), + content: 'Test message content' + } + + await client.handleNewMessage(messageEvent as any) + + // Should not send + expect(mockedGmailUsers.messages.send).not.toHaveBeenCalled() + }) + + it('should handle V2 errors and still clean up lock', async () => { + const messageEvent = { + _id: 'test-message-id', + messageId: 'msg-123', + cardId: 'card-456', + socialId: mockSocialId._id, + date: new Date('2024-01-01T12:00:00Z'), + content: 'Test message content' + } + + // Make send fail + mockedGmailUsers.messages.send.mockRejectedValueOnce(new Error('Send failed')) + + await client.handleNewMessage(messageEvent as any) + + // Should log error + expect(mockContext.error).toHaveBeenCalledWith('Send gmail message v2 error', expect.any(Object)) + + // Reset mocks for second call + mockedGmailUsers.messages.send.mockClear() + mockedGmailUsers.messages.send.mockResolvedValueOnce({}) + ;(mockContext.error as jest.Mock).mockClear() + + // Should be able to retry after first call completes (lock is released in finally) + await client.handleNewMessage(messageEvent as any) + + // Should send successfully this time (only 1 call since we cleared) + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + expect(mockContext.error).not.toHaveBeenCalled() + }) + }) + + describe('Deduplication - Processing Set Cleanup', () => { + let client: GmailClient + + beforeEach(async () => { + client = await GmailClient.create( + mockContext as any, + mockCredentials, + mockUser, + mockClient as any, + mockWorkspaceClient as any, + mockWorkspaceId, + mockStorageAdapter as any + ) + }) + + it('should clear processing set when client closes', async () => { + const mockTxOperations = { + findAll: jest.fn().mockResolvedValue([]), + findOne: jest.fn().mockResolvedValue(undefined), + updateDoc: jest.fn().mockResolvedValue({}) + } + + Object.defineProperty(client, 'client', { + value: mockTxOperations, + writable: true + }) + + const newMessage = { + _id: 'test-message-id', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient@example.com', + subject: 'Test Subject', + content: '

Test content

' + } + + // Start processing a message + const promise = client.createMessage(newMessage as any) + + // Get access to the processing set + const processingMessages = (client as any).processingMessages + + // Verify message is in processing set + expect(processingMessages.size).toBeGreaterThan(0) + + await promise + + // Close the client + await client.close() + + // Verify processing set is cleared + expect(processingMessages.size).toBe(0) + }) + }) }) diff --git a/services/gmail/pod-gmail/src/gmail.ts b/services/gmail/pod-gmail/src/gmail.ts index d554f8ccbfe..e243244aad6 100644 --- a/services/gmail/pod-gmail/src/gmail.ts +++ b/services/gmail/pod-gmail/src/gmail.ts @@ -116,6 +116,7 @@ export class GmailClient { private integration: Integration | null | undefined = undefined private syncStarted: boolean = false private channel: Card | undefined = undefined + private readonly processingMessages = new Set() private constructor ( private readonly ctx: MeasureContext, @@ -339,6 +340,21 @@ export class GmailClient { async createMessage (message: NewMessage): Promise { if (message.status === 'sent') return + + const messageKey = `v1-${message._id}` + + if (this.processingMessages.has(messageKey)) { + this.ctx.info('Message already being processed, skipping duplicate', { + messageKey, + messageId: message._id, + status: message.status, + email: this.email + }) + return + } + + this.processingMessages.add(messageKey) + try { this.ctx.info('Send gmail message', { id: message._id, from: message.from, to: message.to }) const email = await this.getEmail() @@ -370,10 +386,25 @@ export class GmailClient { if (err?.response?.data?.error === 'invalid_grant') { await this.refreshToken() } + } finally { + this.processingMessages.delete(messageKey) } } async handleNewMessage (message: CreateMessageEvent): Promise { + const messageKey = `${message.messageId ?? message._id}-${message.cardId}` + + if (this.processingMessages.has(messageKey)) { + this.ctx.info('Message already being processed, skipping duplicate', { + messageKey, + cardId: message.cardId, + email: this.email + }) + return + } + + this.processingMessages.add(messageKey) + try { const personId = message.socialId if (personId !== this.socialId._id && !this.allSocialIds.has(personId)) { @@ -423,6 +454,8 @@ export class GmailClient { if (err?.response?.data?.error === 'invalid_grant') { await this.refreshToken() } + } finally { + this.processingMessages.delete(messageKey) } } @@ -760,6 +793,10 @@ export class GmailClient { async close (): Promise { if (this.watchTimer !== undefined) clearInterval(this.watchTimer) if (this.refreshTimer !== undefined) clearTimeout(this.refreshTimer) + + // Clear processing messages set + this.processingMessages.clear() + try { if (this.syncManager !== undefined) { this.syncManager.close() From e5ecfca6a01eeec9d605c19a72ad0af745f7dcee Mon Sep 17 00:00:00 2001 From: Artem Savchenko Date: Thu, 4 Dec 2025 14:23:16 +0700 Subject: [PATCH 2/3] Reuse one set Signed-off-by: Artem Savchenko --- .../src/__tests__/gmailClient.test.ts | 30 +++++++++++-------- services/gmail/pod-gmail/src/gmail.ts | 28 ++++++++--------- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts b/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts index 0846797f407..cadc67a1789 100644 --- a/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts +++ b/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts @@ -612,6 +612,9 @@ describe('GmailClient', () => { let mockTxOperations: any beforeEach(async () => { + // Clear the global processing set before each test + ;(GmailClient as any).processingMessages.clear() + mockTxOperations = { findAll: jest.fn().mockResolvedValue([]), findOne: jest.fn().mockResolvedValue(undefined), @@ -667,7 +670,7 @@ describe('GmailClient', () => { expect(mockContext.info).toHaveBeenCalledWith( 'Message already being processed, skipping duplicate', expect.objectContaining({ - messageKey: 'v1-test-message-id' + messageKey: expect.stringContaining('v1-test-workspace-test-social-id-test-message-id') }) ) }) @@ -790,6 +793,9 @@ describe('GmailClient', () => { let mockTxOperations: any beforeEach(async () => { + // Clear the global processing set before each test + ;(GmailClient as any).processingMessages.clear() + // Don't use fake timers for V2 tests to avoid rate limiter issues mockTxOperations = { findAll: jest.fn().mockResolvedValue([]), @@ -869,7 +875,7 @@ describe('GmailClient', () => { expect(mockContext.info).toHaveBeenCalledWith( 'Message already being processed, skipping duplicate', expect.objectContaining({ - messageKey: 'msg-123-card-456' + messageKey: expect.stringContaining('v2-test-workspace-test-social-id-msg-123-card-456') }) ) }) @@ -896,7 +902,7 @@ describe('GmailClient', () => { expect(mockContext.info).toHaveBeenCalledWith( 'Message already being processed, skipping duplicate', expect.objectContaining({ - messageKey: 'test-message-id-card-456' + messageKey: expect.stringContaining('v2-test-workspace-test-social-id-test-message-id-card-456') }) ) }) @@ -987,7 +993,7 @@ describe('GmailClient', () => { ) }) - it('should clear processing set when client closes', async () => { + it('should clean up message from global processing set after completion', async () => { const mockTxOperations = { findAll: jest.fn().mockResolvedValue([]), findOne: jest.fn().mockResolvedValue(undefined), @@ -1010,21 +1016,21 @@ describe('GmailClient', () => { content: '

Test content

' } + // Get access to the global static processing set + const processingMessages = (GmailClient as any).processingMessages + + // Verify set is empty initially + expect(processingMessages.size).toBe(0) + // Start processing a message const promise = client.createMessage(newMessage as any) - // Get access to the processing set - const processingMessages = (client as any).processingMessages - - // Verify message is in processing set + // Verify message is in processing set while processing expect(processingMessages.size).toBeGreaterThan(0) await promise - // Close the client - await client.close() - - // Verify processing set is cleared + // Verify message is removed from processing set after completion expect(processingMessages.size).toBe(0) }) }) diff --git a/services/gmail/pod-gmail/src/gmail.ts b/services/gmail/pod-gmail/src/gmail.ts index e243244aad6..611fae7a3bc 100644 --- a/services/gmail/pod-gmail/src/gmail.ts +++ b/services/gmail/pod-gmail/src/gmail.ts @@ -102,6 +102,8 @@ async function wait (sec: number): Promise { } export class GmailClient { + private static readonly processingMessages = new Set() + private readonly account: AccountUuid private email: string private readonly tokenStorage: TokenStorage @@ -116,7 +118,6 @@ export class GmailClient { private integration: Integration | null | undefined = undefined private syncStarted: boolean = false private channel: Card | undefined = undefined - private readonly processingMessages = new Set() private constructor ( private readonly ctx: MeasureContext, @@ -341,19 +342,20 @@ export class GmailClient { async createMessage (message: NewMessage): Promise { if (message.status === 'sent') return - const messageKey = `v1-${message._id}` + const messageKey = `v1-${this.user.workspace}-${this.socialId._id}-${message._id}` - if (this.processingMessages.has(messageKey)) { + if (GmailClient.processingMessages.has(messageKey)) { this.ctx.info('Message already being processed, skipping duplicate', { messageKey, messageId: message._id, status: message.status, - email: this.email + email: this.email, + workspace: this.user.workspace }) return } - this.processingMessages.add(messageKey) + GmailClient.processingMessages.add(messageKey) try { this.ctx.info('Send gmail message', { id: message._id, from: message.from, to: message.to }) @@ -387,23 +389,24 @@ export class GmailClient { await this.refreshToken() } } finally { - this.processingMessages.delete(messageKey) + GmailClient.processingMessages.delete(messageKey) } } async handleNewMessage (message: CreateMessageEvent): Promise { - const messageKey = `${message.messageId ?? message._id}-${message.cardId}` + const messageKey = `v2-${this.user.workspace}-${this.socialId._id}-${message.messageId ?? message._id}-${message.cardId}` - if (this.processingMessages.has(messageKey)) { + if (GmailClient.processingMessages.has(messageKey)) { this.ctx.info('Message already being processed, skipping duplicate', { messageKey, cardId: message.cardId, - email: this.email + email: this.email, + workspace: this.user.workspace }) return } - this.processingMessages.add(messageKey) + GmailClient.processingMessages.add(messageKey) try { const personId = message.socialId @@ -455,7 +458,7 @@ export class GmailClient { await this.refreshToken() } } finally { - this.processingMessages.delete(messageKey) + GmailClient.processingMessages.delete(messageKey) } } @@ -794,9 +797,6 @@ export class GmailClient { if (this.watchTimer !== undefined) clearInterval(this.watchTimer) if (this.refreshTimer !== undefined) clearTimeout(this.refreshTimer) - // Clear processing messages set - this.processingMessages.clear() - try { if (this.syncManager !== undefined) { this.syncManager.close() From 3ddc9bda254f6d2b68325e1d5947b4cd2867ac66 Mon Sep 17 00:00:00 2001 From: Artem Savchenko Date: Thu, 4 Dec 2025 14:25:51 +0700 Subject: [PATCH 3/3] Clean up Signed-off-by: Artem Savchenko --- services/gmail/pod-gmail/src/gmail.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/services/gmail/pod-gmail/src/gmail.ts b/services/gmail/pod-gmail/src/gmail.ts index 611fae7a3bc..2c9a9e83134 100644 --- a/services/gmail/pod-gmail/src/gmail.ts +++ b/services/gmail/pod-gmail/src/gmail.ts @@ -796,7 +796,6 @@ export class GmailClient { async close (): Promise { if (this.watchTimer !== undefined) clearInterval(this.watchTimer) if (this.refreshTimer !== undefined) clearTimeout(this.refreshTimer) - try { if (this.syncManager !== undefined) { this.syncManager.close()