diff --git a/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts b/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts index ee1fff8320d..cadc67a1789 100644 --- a/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts +++ b/services/gmail/pod-gmail/src/__tests__/gmailClient.test.ts @@ -96,6 +96,9 @@ jest.mock('../tokens') jest.mock('../message/adapter') jest.mock('../message/sync') jest.mock('../message/attachments') +jest.mock('../message/v2/send', () => ({ + makeHTMLBodyV2: jest.fn().mockResolvedValue('encoded-html-body') +})) jest.mock('@hcengineering/server-core', () => ({ withContext: jest.fn().mockImplementation((name: string) => { return function (target: any, propertyKey: string, descriptor: PropertyDescriptor) { @@ -124,7 +127,8 @@ jest.mock('../utils', () => ({ deleteKey: jest.fn().mockImplementation(() => Promise.resolve(undefined)), listKeys: jest.fn() })), - getSpaceId: jest.fn().mockReturnValue('test-space-id') + getSpaceId: jest.fn().mockReturnValue('test-space-id'), + createGmailSearchQuery: jest.fn().mockReturnValue('after:2024-01-01 before:2024-01-02 from:test@example.com') })) // Mock gmail module @@ -144,9 +148,17 @@ jest.mock('@hcengineering/setting', () => ({ } })) +// Mock chat module +jest.mock('@hcengineering/chat', () => ({ + masterTag: { + Thread: 'chat.class.Thread' + } +})) + // Mock config jest.mock('../config', () => ({ - WATCH_TOPIC_NAME: 'test-topic' + WATCH_TOPIC_NAME: 'test-topic', + OutgoingSyncStartDate: new Date('2020-01-01') })) jest.mock('@hcengineering/account-client', () => ({ @@ -594,4 +606,432 @@ describe('GmailClient', () => { }) }) }) + + describe('Deduplication - createMessage (V1)', () => { + let client: GmailClient + let mockTxOperations: any + + beforeEach(async () => { + // Clear the global processing set before each test + ;(GmailClient as any).processingMessages.clear() + + mockTxOperations = { + findAll: jest.fn().mockResolvedValue([]), + findOne: jest.fn().mockResolvedValue(undefined), + createDoc: jest.fn().mockResolvedValue({}), + update: jest.fn().mockResolvedValue({}), + remove: jest.fn().mockResolvedValue({}), + updateDoc: jest.fn().mockResolvedValue({}), + tx: jest.fn().mockResolvedValue({}) + } + + client = await GmailClient.create( + mockContext as any, + mockCredentials, + mockUser, + mockClient as any, + mockWorkspaceClient as any, + mockWorkspaceId, + mockStorageAdapter as any + ) + + // Replace the private TxOperations instance + Object.defineProperty(client, 'client', { + value: mockTxOperations, + writable: true + }) + + // Reset the gmail.messages.send mock + mockedGmailUsers.messages.send.mockClear() + }) + + it('should send message only once when called concurrently', async () => { + const newMessage = { + _id: 'test-message-id', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient@example.com', + subject: 'Test Subject', + content: '

Test content

' + } + + // Call createMessage twice concurrently + const promise1 = client.createMessage(newMessage as any) + const promise2 = client.createMessage(newMessage as any) + + await Promise.all([promise1, promise2]) + + // Should only send once + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + + // Should log that duplicate was skipped + expect(mockContext.info).toHaveBeenCalledWith( + 'Message already being processed, skipping duplicate', + expect.objectContaining({ + messageKey: expect.stringContaining('v1-test-workspace-test-social-id-test-message-id') + }) + ) + }) + + it('should allow retry after first call completes', async () => { + const newMessage = { + _id: 'test-message-id', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient@example.com', + subject: 'Test Subject', + content: '

Test content

' + } + + // First call + await client.createMessage(newMessage as any) + + // Reset mocks to check second call + mockedGmailUsers.messages.send.mockClear() + mockTxOperations.updateDoc.mockClear() + + // Second call after first completes - should succeed since lock is released + await client.createMessage(newMessage as any) + + // Should send again + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + expect(mockTxOperations.updateDoc).toHaveBeenCalledWith('class.NewMessage', 'space', 'test-message-id', { + status: 'sent' + }) + }) + + it('should skip message if already marked as sent', async () => { + const sentMessage = { + _id: 'sent-message-id', + _class: 'class.NewMessage', + space: 'space', + status: 'sent', + from: mockSocialId._id, + to: 'recipient@example.com', + subject: 'Test Subject', + content: '

Test content

' + } + + await client.createMessage(sentMessage as any) + + // Should not send + expect(mockedGmailUsers.messages.send).not.toHaveBeenCalled() + }) + + it('should handle errors and still clean up lock', async () => { + const newMessage = { + _id: 'error-message-id', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient@example.com', + subject: 'Test Subject', + content: '

Test content

' + } + + // Make send fail + mockedGmailUsers.messages.send.mockRejectedValueOnce(new Error('Send failed')) + + await client.createMessage(newMessage as any) + + // Should update status to error + expect(mockTxOperations.updateDoc).toHaveBeenCalledWith('class.NewMessage', 'space', 'error-message-id', { + status: 'error', + error: expect.any(String) + }) + + // Should be able to retry after first call completes (lock is released in finally) + mockedGmailUsers.messages.send.mockResolvedValueOnce({}) + mockTxOperations.updateDoc.mockClear() + + await client.createMessage(newMessage as any) + + // Should update status to sent this time + expect(mockTxOperations.updateDoc).toHaveBeenCalledWith('class.NewMessage', 'space', 'error-message-id', { + status: 'sent' + }) + }) + + it('should handle multiple different messages concurrently', async () => { + const message1 = { + _id: 'message-1', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient1@example.com', + subject: 'Subject 1', + content: '

Content 1

' + } + + const message2 = { + _id: 'message-2', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient2@example.com', + subject: 'Subject 2', + content: '

Content 2

' + } + + // Send two different messages concurrently + await Promise.all([client.createMessage(message1 as any), client.createMessage(message2 as any)]) + + // Should send both messages + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(2) + }) + }) + + describe('Deduplication - handleNewMessage (V2)', () => { + let client: GmailClient + let mockTxOperations: any + + beforeEach(async () => { + // Clear the global processing set before each test + ;(GmailClient as any).processingMessages.clear() + + // Don't use fake timers for V2 tests to avoid rate limiter issues + mockTxOperations = { + findAll: jest.fn().mockResolvedValue([]), + findOne: jest.fn().mockResolvedValue({ + _id: 'test-thread-id', + parent: 'test-channel-id' + }), + createDoc: jest.fn().mockResolvedValue({}), + update: jest.fn().mockResolvedValue({}), + remove: jest.fn().mockResolvedValue({}), + updateDoc: jest.fn().mockResolvedValue({}), + tx: jest.fn().mockResolvedValue({}), + getHierarchy: jest.fn().mockReturnValue({ + isDerived: jest.fn().mockReturnValue(false) + }) + } + + client = await GmailClient.create( + mockContext as any, + mockCredentials, + mockUser, + mockClient as any, + mockWorkspaceClient as any, + mockWorkspaceId, + mockStorageAdapter as any + ) + + // Replace the private TxOperations instance + Object.defineProperty(client, 'client', { + value: mockTxOperations, + writable: true + }) + + // Mock integration to be configured + Object.defineProperty(client, 'integration', { + value: { + _id: 'test-integration', + data: { + integrationVersion: 'v2', + spaceId: 'test-space-id' + } + }, + writable: true + }) + + // Mock the email property + Object.defineProperty(client, 'email', { + value: 'test@example.com', + writable: true + }) + + // Reset the gmail.messages.send mock + mockedGmailUsers.messages.send.mockClear() + mockedGmailUsers.messages.list.mockResolvedValue({ data: { messages: [] } }) + }) + + it('should send V2 message only once when called concurrently', async () => { + const messageEvent = { + _id: 'test-message-id', + messageId: 'msg-123', + cardId: 'card-456', + socialId: mockSocialId._id, + date: new Date('2024-01-01T12:00:00Z'), + content: 'Test message content' + } + + // Call handleNewMessage twice concurrently + const promise1 = client.handleNewMessage(messageEvent as any) + const promise2 = client.handleNewMessage(messageEvent as any) + + await Promise.all([promise1, promise2]) + + // Should only send once + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + + // Should log that duplicate was skipped + expect(mockContext.info).toHaveBeenCalledWith( + 'Message already being processed, skipping duplicate', + expect.objectContaining({ + messageKey: expect.stringContaining('v2-test-workspace-test-social-id-msg-123-card-456') + }) + ) + }) + + it('should use _id as fallback when messageId is not present', async () => { + const messageEvent = { + _id: 'test-message-id', + cardId: 'card-456', + socialId: mockSocialId._id, + date: new Date('2024-01-01T12:00:00Z'), + content: 'Test message content' + } + + // Call handleNewMessage twice concurrently + const promise1 = client.handleNewMessage(messageEvent as any) + const promise2 = client.handleNewMessage(messageEvent as any) + + await Promise.all([promise1, promise2]) + + // Should only send once + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + + // Should log with _id in messageKey + expect(mockContext.info).toHaveBeenCalledWith( + 'Message already being processed, skipping duplicate', + expect.objectContaining({ + messageKey: expect.stringContaining('v2-test-workspace-test-social-id-test-message-id-card-456') + }) + ) + }) + + it('should allow V2 retry after first call completes', async () => { + const messageEvent = { + _id: 'test-message-id', + messageId: 'msg-123', + cardId: 'card-456', + socialId: mockSocialId._id, + date: new Date('2024-01-01T12:00:00Z'), + content: 'Test message content' + } + + // First call + await client.handleNewMessage(messageEvent as any) + + // Reset mocks to check second call + mockedGmailUsers.messages.send.mockClear() + + // Second call after first completes - should succeed since lock is released + await client.handleNewMessage(messageEvent as any) + + // Should send again + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + }) + + it('should skip message if it belongs to different social ID', async () => { + const messageEvent = { + _id: 'test-message-id', + messageId: 'msg-123', + cardId: 'card-456', + socialId: 'different-social-id' as PersonId, + date: new Date('2024-01-01T12:00:00Z'), + content: 'Test message content' + } + + await client.handleNewMessage(messageEvent as any) + + // Should not send + expect(mockedGmailUsers.messages.send).not.toHaveBeenCalled() + }) + + it('should handle V2 errors and still clean up lock', async () => { + const messageEvent = { + _id: 'test-message-id', + messageId: 'msg-123', + cardId: 'card-456', + socialId: mockSocialId._id, + date: new Date('2024-01-01T12:00:00Z'), + content: 'Test message content' + } + + // Make send fail + mockedGmailUsers.messages.send.mockRejectedValueOnce(new Error('Send failed')) + + await client.handleNewMessage(messageEvent as any) + + // Should log error + expect(mockContext.error).toHaveBeenCalledWith('Send gmail message v2 error', expect.any(Object)) + + // Reset mocks for second call + mockedGmailUsers.messages.send.mockClear() + mockedGmailUsers.messages.send.mockResolvedValueOnce({}) + ;(mockContext.error as jest.Mock).mockClear() + + // Should be able to retry after first call completes (lock is released in finally) + await client.handleNewMessage(messageEvent as any) + + // Should send successfully this time (only 1 call since we cleared) + expect(mockedGmailUsers.messages.send).toHaveBeenCalledTimes(1) + expect(mockContext.error).not.toHaveBeenCalled() + }) + }) + + describe('Deduplication - Processing Set Cleanup', () => { + let client: GmailClient + + beforeEach(async () => { + client = await GmailClient.create( + mockContext as any, + mockCredentials, + mockUser, + mockClient as any, + mockWorkspaceClient as any, + mockWorkspaceId, + mockStorageAdapter as any + ) + }) + + it('should clean up message from global processing set after completion', async () => { + const mockTxOperations = { + findAll: jest.fn().mockResolvedValue([]), + findOne: jest.fn().mockResolvedValue(undefined), + updateDoc: jest.fn().mockResolvedValue({}) + } + + Object.defineProperty(client, 'client', { + value: mockTxOperations, + writable: true + }) + + const newMessage = { + _id: 'test-message-id', + _class: 'class.NewMessage', + space: 'space', + status: 'new', + from: mockSocialId._id, + to: 'recipient@example.com', + subject: 'Test Subject', + content: '

Test content

' + } + + // Get access to the global static processing set + const processingMessages = (GmailClient as any).processingMessages + + // Verify set is empty initially + expect(processingMessages.size).toBe(0) + + // Start processing a message + const promise = client.createMessage(newMessage as any) + + // Verify message is in processing set while processing + expect(processingMessages.size).toBeGreaterThan(0) + + await promise + + // Verify message is removed from processing set after completion + expect(processingMessages.size).toBe(0) + }) + }) }) diff --git a/services/gmail/pod-gmail/src/gmail.ts b/services/gmail/pod-gmail/src/gmail.ts index d554f8ccbfe..2c9a9e83134 100644 --- a/services/gmail/pod-gmail/src/gmail.ts +++ b/services/gmail/pod-gmail/src/gmail.ts @@ -102,6 +102,8 @@ async function wait (sec: number): Promise { } export class GmailClient { + private static readonly processingMessages = new Set() + private readonly account: AccountUuid private email: string private readonly tokenStorage: TokenStorage @@ -339,6 +341,22 @@ export class GmailClient { async createMessage (message: NewMessage): Promise { if (message.status === 'sent') return + + const messageKey = `v1-${this.user.workspace}-${this.socialId._id}-${message._id}` + + if (GmailClient.processingMessages.has(messageKey)) { + this.ctx.info('Message already being processed, skipping duplicate', { + messageKey, + messageId: message._id, + status: message.status, + email: this.email, + workspace: this.user.workspace + }) + return + } + + GmailClient.processingMessages.add(messageKey) + try { this.ctx.info('Send gmail message', { id: message._id, from: message.from, to: message.to }) const email = await this.getEmail() @@ -370,10 +388,26 @@ export class GmailClient { if (err?.response?.data?.error === 'invalid_grant') { await this.refreshToken() } + } finally { + GmailClient.processingMessages.delete(messageKey) } } async handleNewMessage (message: CreateMessageEvent): Promise { + const messageKey = `v2-${this.user.workspace}-${this.socialId._id}-${message.messageId ?? message._id}-${message.cardId}` + + if (GmailClient.processingMessages.has(messageKey)) { + this.ctx.info('Message already being processed, skipping duplicate', { + messageKey, + cardId: message.cardId, + email: this.email, + workspace: this.user.workspace + }) + return + } + + GmailClient.processingMessages.add(messageKey) + try { const personId = message.socialId if (personId !== this.socialId._id && !this.allSocialIds.has(personId)) { @@ -423,6 +457,8 @@ export class GmailClient { if (err?.response?.data?.error === 'invalid_grant') { await this.refreshToken() } + } finally { + GmailClient.processingMessages.delete(messageKey) } }