diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java index 26f1f9087b2..c07cf8c198e 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java @@ -386,6 +386,20 @@ default Flux search(MailboxQuery expression, MailboxSession ses */ Publisher search(MultimailboxesSearchQuery expression, MailboxSession session, long limit); + /** + * Searches for messages matching the given query with collapse threads. + * + * @param expression + * not null + * @param session + * the context for this call, not null + * @param offset + * offset to apply on the result set + * @param limit + * max number of results to return + */ + Publisher searchWithCollapseThreads(MultimailboxesSearchQuery expression, MailboxSession session, long offset, long limit); + /** * Returns the list of MessageId of messages belonging to that Thread * diff --git a/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java b/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java index c88bd7dba71..394a9541fac 100644 --- a/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java +++ b/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/events/OpenSearchListeningMessageSearchIndex.java @@ -375,6 +375,22 @@ public Flux search(MailboxSession session, Collection mail .take(limit); } + @Override + public Flux searchWithCollapseThreads(MailboxSession session, Collection mailboxIds, SearchQuery searchQuery, + long offset, long limit) { + Preconditions.checkArgument(session != null, "'session' is mandatory"); + + if (mailboxIds.isEmpty()) { + return Flux.empty(); + } + + return searcher.searchCollapsedByThreadId(mailboxIds, searchQuery, Math.toIntExact(offset), Math.toIntExact(limit), + MESSAGE_ID_FIELD, !SEARCH_HIGHLIGHT) + .handle(this::extractMessageIdFromHit) + .distinct() + .take(limit); + } + @Override public Mono add(MailboxSession session, Mailbox mailbox, MailboxMessage message) { return add(session, mailbox, message, Optional.empty()); diff --git a/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcher.java b/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcher.java index 3d61f747a84..138622ee0f1 100644 --- a/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcher.java +++ b/mailbox/opensearch/src/main/java/org/apache/james/mailbox/opensearch/search/OpenSearchSearcher.java @@ -21,6 +21,7 @@ import static org.apache.james.mailbox.opensearch.search.OpenSearchSearchHighlighter.ATTACHMENT_TEXT_CONTENT_FIELD; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -104,6 +105,18 @@ public Flux> search(Collection mailboxIds, SearchQuer .searchHits(); } + public Flux> searchCollapsedByThreadId(Collection mailboxIds, SearchQuery query, + int offset, int limit, List fields, + boolean searchHighlight) { + SearchRequest searchRequest = prepareCollapsedSearch(mailboxIds, query, offset, limit, fields, searchHighlight); + try { + return client.search(searchRequest) + .flatMapMany(response -> Flux.fromIterable(response.hits().hits())); + } catch (IOException e) { + return Flux.error(e); + } + } + private SearchRequest prepareSearch(Collection mailboxIds, SearchQuery query, Optional limit, List fields, boolean highlight) { List sorts = query.getSorts() @@ -130,6 +143,33 @@ private SearchRequest prepareSearch(Collection mailboxIds, SearchQuer .build(); } + private SearchRequest prepareCollapsedSearch(Collection mailboxIds, SearchQuery query, int offset, int limit, + List fields, boolean highlight) { + List sorts = query.getSorts() + .stream() + .flatMap(SortConverter::convertSort) + .map(fieldSort -> new SortOptions.Builder().field(fieldSort).build()) + .collect(Collectors.toList()); + + SearchRequest.Builder request = new SearchRequest.Builder() + .index(aliasName.getValue()) + .query(queryConverter.from(mailboxIds, query)) + .from(offset) + .size(limit) + .storedFields(fields) + .sort(sorts) + .collapse(collapse -> collapse.field(JsonMessageConstants.THREAD_ID)); + + if (highlight) { + request.highlight(highlightQuery); + } + + return toRoutingKey(mailboxIds) + .map(request::routing) + .orElse(request) + .build(); + } + private Optional toRoutingKey(Collection mailboxIds) { if (mailboxIds.size() < MAX_ROUTING_KEY) { return Optional.of(mailboxIds.stream() diff --git a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java index 4a3182dafd7..a6b2bcf38c8 100644 --- a/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java +++ b/mailbox/opensearch/src/test/java/org/apache/james/mailbox/opensearch/OpenSearchIntegrationTest.java @@ -185,6 +185,11 @@ protected MessageId initOtherBasedMessageId() { return InMemoryMessageId.of(1000); } + @Override + protected boolean supportsThreadCollapse() { + return true; + } + @Test void theDocumentShouldBeReindexWithNewMailboxWhenMoveMessages() throws Exception { // Given mailboxA, mailboxB. Add message in mailboxA diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java index 16bf7c12cd9..710b429fa14 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java @@ -952,6 +952,14 @@ public Flux search(MultimailboxesSearchQuery expression, MailboxSessi .flatMapMany(Throwing.function(ids -> index.search(session, ids, expression.getSearchQuery(), limit))); } + @Override + public Flux searchWithCollapseThreads(MultimailboxesSearchQuery expression, MailboxSession session, long offset, long limit) { + return getInMailboxIds(expression, session) + .filter(id -> !expression.getNotInMailboxes().contains(id)) + .collect(ImmutableSet.toImmutableSet()) + .flatMapMany(Throwing.function(ids -> index.searchWithCollapseThreads(session, ids, expression.getSearchQuery(), offset, limit))); + } + @Override public Flux getThread(ThreadId threadId, MailboxSession session) { return threadIdGuessingAlgorithm.getMessageIdsInThread(threadId, session); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java index 30269adcd2c..499ca8856f2 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java @@ -52,6 +52,12 @@ public interface MessageSearchIndex { */ Flux search(MailboxSession session, Collection mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException; + default Flux searchWithCollapseThreads(MailboxSession session, Collection mailboxIds, SearchQuery searchQuery, + long offset, long limit) throws MailboxException { + return Flux.from(search(session, mailboxIds, searchQuery, offset + limit)) + .skip(offset); + } + EnumSet getSupportedCapabilities(EnumSet messageCapabilities); class SearchResult { diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java index ca66422660d..590ae12aecd 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java @@ -289,6 +289,10 @@ protected void setUp() throws Exception { protected abstract MessageId initOtherBasedMessageId(); + protected boolean supportsThreadCollapse() { + return false; + } + @Test void searchingMessageInMultipleMailboxShouldNotReturnTwiceTheSameMessage() throws MailboxException { assumeTrue(messageIdManager != null); @@ -1691,6 +1695,71 @@ void givenTwoDistinctThreadsThenGetThreadShouldNotReturnUnrelatedMails() throws assertThat(actual).doesNotContain(message1.getMessageId(), message2.getMessageId()); } + @Test + void searchWithCollapseThreadsShouldReturnOneMessagePerThread() throws MailboxException { + assumeTrue(supportsThreadCollapse(), "This test is only relevant when collapseThread is supported"); + + ThreadId threadId1 = ThreadId.fromBaseMessageId(newBasedMessageId); + ThreadId threadId2 = ThreadId.fromBaseMessageId(otherBasedMessageId); + MailboxMessage message1 = createMessage(quanMailbox, threadId1); + MailboxMessage message2 = createMessage(quanMailbox, threadId1); + MailboxMessage message3 = createMessage(quanMailbox, threadId2); + + // Thread 1: message1 and message2 + // Thread 2: message3 + appendMessageThenDispatchAddedEvent(quanMailbox, message1); + appendMessageThenDispatchAddedEvent(quanMailbox, message2); + appendMessageThenDispatchAddedEvent(quanMailbox, message3); + + awaitMessageCount(ImmutableList.of(quanMailbox.getMailboxId()), SearchQuery.matchAll(), 3); + + List allCollapsedMessages = messageSearchIndex.searchWithCollapseThreads(quanSession, + ImmutableList.of(quanMailbox.getMailboxId()), SearchQuery.matchAll(), 0, 10) + .collectList().block(); + + assertThat(allCollapsedMessages).containsExactly(message1.getMessageId(), message3.getMessageId()); + } + + @Test + void searchWithCollapseThreadsShouldApplyPaginationOnCollapsedList() throws MailboxException { + assumeTrue(supportsThreadCollapse(), "This test is only relevant when collapseThread is supported"); + + ThreadId threadId1 = ThreadId.fromBaseMessageId(newBasedMessageId); + ThreadId threadId2 = ThreadId.fromBaseMessageId(otherBasedMessageId); + ThreadId threadId3 = ThreadId.fromBaseMessageId(messageIdFactory.generate()); + + // Thread1 has two messages; thread2 and thread3 have one each. + MailboxMessage thread1Older = createMessage(quanMailbox, threadId1, new Date(1000)); + MailboxMessage thread1Newer = createMessage(quanMailbox, threadId1, new Date(3000)); + MailboxMessage thread2Message = createMessage(quanMailbox, threadId2, new Date(2000)); + MailboxMessage thread3Message = createMessage(quanMailbox, threadId3, new Date(500)); + + appendMessageThenDispatchAddedEvent(quanMailbox, thread1Older); + appendMessageThenDispatchAddedEvent(quanMailbox, thread1Newer); + appendMessageThenDispatchAddedEvent(quanMailbox, thread2Message); + appendMessageThenDispatchAddedEvent(quanMailbox, thread3Message); + + awaitMessageCount(ImmutableList.of(quanMailbox.getMailboxId()), SearchQuery.matchAll(), 4); + + SearchQuery searchQuery = SearchQuery.builder() + .sorts(new Sort(SortClause.Arrival, Order.REVERSE)) + .build(); + + // With Arrival sort descending, the collapsed list should be ordered by: + // 1) thread1 (newest at date=3000), 2) thread2 (date=2000), 3) thread3 (date=500). + List allCollapsedMessages = messageSearchIndex.searchWithCollapseThreads(quanSession, + ImmutableList.of(quanMailbox.getMailboxId()), searchQuery, 0, 10) + .collectList().block(); + + // Request offset=1, limit=2 to fetch from the second message (thread2, thread3). + List paginatedCollapsedMessages = messageSearchIndex.searchWithCollapseThreads(quanSession, + ImmutableList.of(quanMailbox.getMailboxId()), searchQuery, 1, 2) + .collectList().block(); + + assertThat(allCollapsedMessages).containsExactly(thread1Newer.getMessageId(), thread2Message.getMessageId(), thread3Message.getMessageId()); + assertThat(paginatedCollapsedMessages).containsExactly(thread2Message.getMessageId(), thread3Message.getMessageId()); + } + @Test void givenNonThreadThenGetThreadShouldReturnEmptyListMessageId() throws MailboxException { // given non messages in thread1 @@ -1718,12 +1787,16 @@ private void appendMessageThenDispatchAddedEvent(Mailbox mailbox, MailboxMessage } private SimpleMailboxMessage createMessage(Mailbox mailbox, ThreadId threadId) { + return createMessage(mailbox, threadId, new Date()); + } + + private SimpleMailboxMessage createMessage(Mailbox mailbox, ThreadId threadId, Date internalDate) { MessageId messageId = messageIdFactory.generate(); String content = "Some content"; int bodyStart = 16; return new SimpleMailboxMessage(messageId, threadId, - new Date(), + internalDate, content.length(), bodyStart, new ByteContent(content.getBytes()), diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailQueryMethodContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailQueryMethodContract.scala index 079f193f552..ebc55a5dc31 100644 --- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailQueryMethodContract.scala +++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailQueryMethodContract.scala @@ -40,6 +40,7 @@ import org.apache.james.jmap.core.ResponseObject.SESSION_STATE import org.apache.james.jmap.core.UTCDate import org.apache.james.jmap.http.UserCredential import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder} +import org.apache.james.jmap.rfc8621.contract.tags.CategoryTags import org.apache.james.mailbox.FlagsBuilder import org.apache.james.mailbox.MessageManager.AppendCommand import org.apache.james.mailbox.model.MailboxACL.Right @@ -54,7 +55,7 @@ import org.apache.james.util.ClassLoaderUtils import org.apache.james.utils.DataProbeImpl import org.awaitility.Awaitility import org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS -import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.{BeforeEach, Tag, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource} import org.threeten.extra.Seconds @@ -7759,6 +7760,152 @@ trait EmailQueryMethodContract { } } + @Test + def collapseThreadsShouldApplyOnSearchIndexPath(server: GuiceJamesServer): Unit = { + val thread1Message: Message = buildTestThreadMessage("thread-1", "Message-ID-1") + val thread2Message: Message = buildTestThreadMessage("thread-2", "Message-ID-2") + + val threeDaysBefore = Date.from(ZonedDateTime.now().minusDays(3).toInstant) + val twoDaysBefore = Date.from(ZonedDateTime.now().minusDays(2).toInstant) + val oneDayBefore = Date.from(ZonedDateTime.now().minusDays(1).toInstant) + val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB)) + + // Thread 1: message 1 (oldest), message 3 (newest) + // Thread 2: message 2 + val messageId1: MessageId = sendMessageToBobInbox(server, thread1Message, threeDaysBefore) + val messageId2: MessageId = sendMessageToBobInbox(server, thread2Message, twoDaysBefore) + val messageId3: MessageId = sendMessageToBobInbox(server, thread1Message, oneDayBefore) + + val request: String = + s"""{ + | "using": [ + | "urn:ietf:params:jmap:core", + | "urn:ietf:params:jmap:mail"], + | "methodCalls": [[ + | "Email/query", + | { + | "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6", + | "filter": { + | "inMailbox": "${mailboxId.serialize()}", + | "text": "testmail" + | }, + | "sort": [{ + | "property":"receivedAt", + | "isAscending": false + | }], + | "collapseThreads": true + | }, + | "c1"]] + |}""".stripMargin + + awaitAtMostTenSeconds.untilAsserted { () => + val response = `given` + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .body(request) + .when + .post + .`then` + .statusCode(SC_OK) + .contentType(JSON) + .extract + .body + .asString + + assertThatJson(response).isEqualTo( + s"""{ + | "sessionState": "${SESSION_STATE.value}", + | "methodResponses": [[ + | "Email/query", + | { + | "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6", + | "queryState": "${generateQueryState(messageId3, messageId2)}", + | "canCalculateChanges": false, + | "position": 0, + | "limit": 256, + | "ids": ["${messageId3.serialize}", "${messageId2.serialize}"] + | }, + | "c1" + | ]] + |}""".stripMargin) + } + } + + @Test + @Tag(CategoryTags.BASIC_FEATURE) + def collapseThreadsShouldApplyPaginationOnCollapsedResults(server: GuiceJamesServer): Unit = { + val thread1Message: Message = buildTestThreadMessage("thread-1", "Message-ID-1") + val thread2Message: Message = buildTestThreadMessage("thread-2", "Message-ID-2") + val thread3Message: Message = buildTestThreadMessage("thread-3", "Message-ID-3") + + val fourDaysBefore = Date.from(ZonedDateTime.now().minusDays(4).toInstant) + val threeDaysBefore = Date.from(ZonedDateTime.now().minusDays(3).toInstant) + val twoDaysBefore = Date.from(ZonedDateTime.now().minusDays(2).toInstant) + val oneDayBefore = Date.from(ZonedDateTime.now().minusDays(1).toInstant) + val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB)) + + // Thread 1: message 1 (oldest), message 2 (newest) + // Thread 2: message 3 + // Thread 3: message 4 + val messageId1: MessageId = sendMessageToBobInbox(server, thread1Message, twoDaysBefore) + val messageId2: MessageId = sendMessageToBobInbox(server, thread1Message, oneDayBefore) + val messageId3: MessageId = sendMessageToBobInbox(server, thread2Message, threeDaysBefore) + val messageId4: MessageId = sendMessageToBobInbox(server, thread3Message, fourDaysBefore) + + val request: String = + s"""{ + | "using": [ + | "urn:ietf:params:jmap:core", + | "urn:ietf:params:jmap:mail"], + | "methodCalls": [[ + | "Email/query", + | { + | "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6", + | "filter": { + | "inMailbox": "${mailboxId.serialize()}", + | "text": "testmail" + | }, + | "sort": [{ + | "property":"receivedAt", + | "isAscending": false + | }], + | "position": 1, + | "limit": 2, + | "collapseThreads": true + | }, + | "c1"]] + |}""".stripMargin + + awaitAtMostTenSeconds.untilAsserted { () => + val response: String = `given` + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .body(request) + .when + .post + .`then` + .statusCode(SC_OK) + .contentType(JSON) + .extract + .body + .asString + + assertThatJson(response).isEqualTo( + s"""{ + | "sessionState": "${SESSION_STATE.value}", + | "methodResponses": [[ + | "Email/query", + | { + | "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6", + | "queryState": "${generateQueryState(messageId3, messageId4)}", + | "canCalculateChanges": false, + | "position": 1, + | "ids": ["${messageId3.serialize}", "${messageId4.serialize}"] + | }, + | "c1" + | ]] + |}""".stripMargin) + } + } + private def sendMessageToBobInbox(server: GuiceJamesServer, message: Message, requestDate: Date): MessageId = { server.getProbe(classOf[MailboxProbeImpl]) .appendMessage(BOB.asString, MailboxPath.inbox(BOB), diff --git a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEmailQueryMethodNoViewTest.java b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEmailQueryMethodNoViewTest.java index 49cd6cc5b3f..7cf7cdf2563 100644 --- a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEmailQueryMethodNoViewTest.java +++ b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEmailQueryMethodNoViewTest.java @@ -101,4 +101,16 @@ public void inMailboxSortedBySentAtShouldCollapseThreads(GuiceJamesServer server @Disabled("JAMES-3340 Not supported for no email query view") public void inMailboxBeforeSortedByReceivedAtShouldCollapseThreads(GuiceJamesServer server) { } + + @Test + @Override + @Disabled("JAMES-4166 Lucene implementation does not support collapseThreads yet") + public void collapseThreadsShouldApplyOnSearchIndexPath(GuiceJamesServer server) { + } + + @Test + @Override + @Disabled("JAMES-4166 Lucene implementation does not support collapseThreads yet") + public void collapseThreadsShouldApplyPaginationOnCollapsedResults(GuiceJamesServer server) { + } } diff --git a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEmailQueryMethodTest.java b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEmailQueryMethodTest.java index f5152dd2226..69626013e4f 100644 --- a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEmailQueryMethodTest.java +++ b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEmailQueryMethodTest.java @@ -52,4 +52,16 @@ public void shouldListMailsReceivedAfterADate(GuiceJamesServer server) { EmailQueryMethodContract.super.shouldListMailsReceivedAfterADate(server); } + @Test + @Override + @Disabled("JAMES-4166 Lucene implementation does not support collapseThreads yet") + public void collapseThreadsShouldApplyOnSearchIndexPath(GuiceJamesServer server) { + } + + @Test + @Override + @Disabled("JAMES-4166 Lucene implementation does not support collapseThreads yet") + public void collapseThreadsShouldApplyPaginationOnCollapsedResults(GuiceJamesServer server) { + } + } diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailQueryMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailQueryMethod.scala index e164e3bba3f..dc4c725ca6b 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailQueryMethod.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailQueryMethod.scala @@ -92,6 +92,7 @@ class EmailQueryMethod @Inject() (serializer: EmailQuerySerializer, } private def executeQuery(session: MailboxSession, request: EmailQueryRequest, searchQuery: MultimailboxesSearchQuery, position: Position, limit: Limit): SMono[EmailQueryResponse] = { + val collapseThreads: Boolean = getCollapseThreads(request) val ids: SMono[Seq[MessageId]] = request match { case request: EmailQueryRequest if matchesInMailboxSortedBySentAt(request) => queryViewForListingSortedBySentAt(session, position, limit, request, searchQuery.getNamespace) @@ -101,7 +102,7 @@ class EmailQueryMethod @Inject() (serializer: EmailQuerySerializer, queryViewForContentAfterSortedByReceivedAt(session, position, limit, request, searchQuery.getNamespace) case request: EmailQueryRequest if matchesInMailboxBeforeSortedByReceivedAt(request) => queryViewForContentBeforeSortedByReceivedAt(session, position, limit, request, searchQuery.getNamespace) - case _ => executeQueryAgainstSearchIndex(session, searchQuery, position, limit) + case _ => executeQueryAgainstSearchIndex(session, searchQuery, position, limit, collapseThreads) } ids.map(ids => toResponse(request, position, limit, ids)) @@ -199,13 +200,23 @@ class EmailQueryMethod @Inject() (serializer: EmailQuerySerializer, position = position, limit = Some(limitToUse).filterNot(used => request.limit.map(_.value).contains(used.value))) - private def executeQueryAgainstSearchIndex(mailboxSession: MailboxSession, searchQuery: MultimailboxesSearchQuery, position: Position, limitToUse: Limit): SMono[Seq[MessageId]] = - SFlux.fromPublisher(mailboxManager.search( - searchQuery.addCriterion(SearchQuery.flagIsUnSet(DELETED)), - mailboxSession, - position.value + limitToUse)) - .drop(position.value) - .collectSeq() + private def executeQueryAgainstSearchIndex(mailboxSession: MailboxSession, searchQuery: MultimailboxesSearchQuery, + position: Position, limitToUse: Limit, collapseThreads: Boolean): SMono[Seq[MessageId]] = + if (collapseThreads) { + SFlux.fromPublisher(mailboxManager.searchWithCollapseThreads( + searchQuery.addCriterion(SearchQuery.flagIsUnSet(DELETED)), + mailboxSession, + position.value, + limitToUse.value)) + .collectSeq() + } else { + SFlux.fromPublisher(mailboxManager.search( + searchQuery.addCriterion(SearchQuery.flagIsUnSet(DELETED)), + mailboxSession, + position.value + limitToUse.value)) + .drop(position.value) + .collectSeq() + } private def searchQueryFromRequest(request: EmailQueryRequest, capabilities: Set[CapabilityIdentifier], session: MailboxSession): Either[UnsupportedOperationException, MultimailboxesSearchQuery] = { val comparators: List[Comparator] = request.sort.getOrElse(Set()).toList