Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,20 @@ default Flux<MailboxMetaData> search(MailboxQuery expression, MailboxSession ses
*/
Publisher<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit);

/**
* Searches for messages matching the given query with collapse threads.
*
* @param expression
* not null
* @param session
* the context for this call, not null
* @param offset
* offset to apply on the result set
* @param limit
* max number of results to return
*/
Publisher<MessageId> searchWithCollapseThreads(MultimailboxesSearchQuery expression, MailboxSession session, long offset, long limit);

/**
* Returns the list of MessageId of messages belonging to that Thread
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,22 @@ public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mail
.take(limit);
}

@Override
public Flux<MessageId> searchWithCollapseThreads(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery,
long offset, long limit) {
Preconditions.checkArgument(session != null, "'session' is mandatory");

if (mailboxIds.isEmpty()) {
return Flux.empty();
}

return searcher.searchCollapsedByThreadId(mailboxIds, searchQuery, Math.toIntExact(offset), Math.toIntExact(limit),
MESSAGE_ID_FIELD, !SEARCH_HIGHLIGHT)
.handle(this::extractMessageIdFromHit)
.distinct()
.take(limit);
}

@Override
public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message) {
return add(session, mailbox, message, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static org.apache.james.mailbox.opensearch.search.OpenSearchSearchHighlighter.ATTACHMENT_TEXT_CONTENT_FIELD;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -104,6 +105,18 @@ public Flux<Hit<ObjectNode>> search(Collection<MailboxId> mailboxIds, SearchQuer
.searchHits();
}

public Flux<Hit<ObjectNode>> searchCollapsedByThreadId(Collection<MailboxId> mailboxIds, SearchQuery query,
int offset, int limit, List<String> fields,
boolean searchHighlight) {
SearchRequest searchRequest = prepareCollapsedSearch(mailboxIds, query, offset, limit, fields, searchHighlight);
try {
return client.search(searchRequest)
.flatMapMany(response -> Flux.fromIterable(response.hits().hits()));
} catch (IOException e) {
return Flux.error(e);
}
}

private SearchRequest prepareSearch(Collection<MailboxId> mailboxIds, SearchQuery query,
Optional<Integer> limit, List<String> fields, boolean highlight) {
List<SortOptions> sorts = query.getSorts()
Expand All @@ -130,6 +143,33 @@ private SearchRequest prepareSearch(Collection<MailboxId> mailboxIds, SearchQuer
.build();
}

private SearchRequest prepareCollapsedSearch(Collection<MailboxId> mailboxIds, SearchQuery query, int offset, int limit,
List<String> fields, boolean highlight) {
List<SortOptions> sorts = query.getSorts()
.stream()
.flatMap(SortConverter::convertSort)
.map(fieldSort -> new SortOptions.Builder().field(fieldSort).build())
.collect(Collectors.toList());

SearchRequest.Builder request = new SearchRequest.Builder()
.index(aliasName.getValue())
.query(queryConverter.from(mailboxIds, query))
.from(offset)
.size(limit)
.storedFields(fields)
.sort(sorts)
.collapse(collapse -> collapse.field(JsonMessageConstants.THREAD_ID));

if (highlight) {
request.highlight(highlightQuery);
}

return toRoutingKey(mailboxIds)
.map(request::routing)
.orElse(request)
.build();
}

private Optional<String> toRoutingKey(Collection<MailboxId> mailboxIds) {
if (mailboxIds.size() < MAX_ROUTING_KEY) {
return Optional.of(mailboxIds.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ protected MessageId initOtherBasedMessageId() {
return InMemoryMessageId.of(1000);
}

@Override
protected boolean supportsThreadCollapse() {
return true;
}

@Test
void theDocumentShouldBeReindexWithNewMailboxWhenMoveMessages() throws Exception {
// Given mailboxA, mailboxB. Add message in mailboxA
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,14 @@ public Flux<MessageId> search(MultimailboxesSearchQuery expression, MailboxSessi
.flatMapMany(Throwing.function(ids -> index.search(session, ids, expression.getSearchQuery(), limit)));
}

@Override
public Flux<MessageId> searchWithCollapseThreads(MultimailboxesSearchQuery expression, MailboxSession session, long offset, long limit) {
return getInMailboxIds(expression, session)
.filter(id -> !expression.getNotInMailboxes().contains(id))
.collect(ImmutableSet.toImmutableSet())
.flatMapMany(Throwing.function(ids -> index.searchWithCollapseThreads(session, ids, expression.getSearchQuery(), offset, limit)));
}

@Override
public Flux<MessageId> getThread(ThreadId threadId, MailboxSession session) {
return threadIdGuessingAlgorithm.getMessageIdsInThread(threadId, session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public interface MessageSearchIndex {
*/
Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException;

default Flux<MessageId> searchWithCollapseThreads(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery,
long offset, long limit) throws MailboxException {
return Flux.from(search(session, mailboxIds, searchQuery, offset + limit))
.skip(offset);
}

EnumSet<MailboxManager.SearchCapabilities> getSupportedCapabilities(EnumSet<MailboxManager.MessageCapabilities> messageCapabilities);

class SearchResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ protected void setUp() throws Exception {

protected abstract MessageId initOtherBasedMessageId();

protected boolean supportsThreadCollapse() {
return false;
}

@Test
void searchingMessageInMultipleMailboxShouldNotReturnTwiceTheSameMessage() throws MailboxException {
assumeTrue(messageIdManager != null);
Expand Down Expand Up @@ -1691,6 +1695,71 @@ void givenTwoDistinctThreadsThenGetThreadShouldNotReturnUnrelatedMails() throws
assertThat(actual).doesNotContain(message1.getMessageId(), message2.getMessageId());
}

@Test
void searchWithCollapseThreadsShouldReturnOneMessagePerThread() throws MailboxException {
assumeTrue(supportsThreadCollapse(), "This test is only relevant when collapseThread is supported");

ThreadId threadId1 = ThreadId.fromBaseMessageId(newBasedMessageId);
ThreadId threadId2 = ThreadId.fromBaseMessageId(otherBasedMessageId);
MailboxMessage message1 = createMessage(quanMailbox, threadId1);
MailboxMessage message2 = createMessage(quanMailbox, threadId1);
MailboxMessage message3 = createMessage(quanMailbox, threadId2);

// Thread 1: message1 and message2
// Thread 2: message3
appendMessageThenDispatchAddedEvent(quanMailbox, message1);
appendMessageThenDispatchAddedEvent(quanMailbox, message2);
appendMessageThenDispatchAddedEvent(quanMailbox, message3);

awaitMessageCount(ImmutableList.of(quanMailbox.getMailboxId()), SearchQuery.matchAll(), 3);

List<MessageId> allCollapsedMessages = messageSearchIndex.searchWithCollapseThreads(quanSession,
ImmutableList.of(quanMailbox.getMailboxId()), SearchQuery.matchAll(), 0, 10)
.collectList().block();

assertThat(allCollapsedMessages).containsExactly(message1.getMessageId(), message3.getMessageId());
}

@Test
void searchWithCollapseThreadsShouldApplyPaginationOnCollapsedList() throws MailboxException {
assumeTrue(supportsThreadCollapse(), "This test is only relevant when collapseThread is supported");

ThreadId threadId1 = ThreadId.fromBaseMessageId(newBasedMessageId);
ThreadId threadId2 = ThreadId.fromBaseMessageId(otherBasedMessageId);
ThreadId threadId3 = ThreadId.fromBaseMessageId(messageIdFactory.generate());

// Thread1 has two messages; thread2 and thread3 have one each.
MailboxMessage thread1Older = createMessage(quanMailbox, threadId1, new Date(1000));
MailboxMessage thread1Newer = createMessage(quanMailbox, threadId1, new Date(3000));
MailboxMessage thread2Message = createMessage(quanMailbox, threadId2, new Date(2000));
MailboxMessage thread3Message = createMessage(quanMailbox, threadId3, new Date(500));

appendMessageThenDispatchAddedEvent(quanMailbox, thread1Older);
appendMessageThenDispatchAddedEvent(quanMailbox, thread1Newer);
appendMessageThenDispatchAddedEvent(quanMailbox, thread2Message);
appendMessageThenDispatchAddedEvent(quanMailbox, thread3Message);

awaitMessageCount(ImmutableList.of(quanMailbox.getMailboxId()), SearchQuery.matchAll(), 4);

SearchQuery searchQuery = SearchQuery.builder()
.sorts(new Sort(SortClause.Arrival, Order.REVERSE))
.build();

// With Arrival sort descending, the collapsed list should be ordered by:
// 1) thread1 (newest at date=3000), 2) thread2 (date=2000), 3) thread3 (date=500).
List<MessageId> allCollapsedMessages = messageSearchIndex.searchWithCollapseThreads(quanSession,
ImmutableList.of(quanMailbox.getMailboxId()), searchQuery, 0, 10)
.collectList().block();

// Request offset=1, limit=2 to fetch from the second message (thread2, thread3).
List<MessageId> paginatedCollapsedMessages = messageSearchIndex.searchWithCollapseThreads(quanSession,
ImmutableList.of(quanMailbox.getMailboxId()), searchQuery, 1, 2)
.collectList().block();

assertThat(allCollapsedMessages).containsExactly(thread1Newer.getMessageId(), thread2Message.getMessageId(), thread3Message.getMessageId());
assertThat(paginatedCollapsedMessages).containsExactly(thread2Message.getMessageId(), thread3Message.getMessageId());
}

@Test
void givenNonThreadThenGetThreadShouldReturnEmptyListMessageId() throws MailboxException {
// given non messages in thread1
Expand Down Expand Up @@ -1718,12 +1787,16 @@ private void appendMessageThenDispatchAddedEvent(Mailbox mailbox, MailboxMessage
}

private SimpleMailboxMessage createMessage(Mailbox mailbox, ThreadId threadId) {
return createMessage(mailbox, threadId, new Date());
}

private SimpleMailboxMessage createMessage(Mailbox mailbox, ThreadId threadId, Date internalDate) {
MessageId messageId = messageIdFactory.generate();
String content = "Some content";
int bodyStart = 16;
return new SimpleMailboxMessage(messageId,
threadId,
new Date(),
internalDate,
content.length(),
bodyStart,
new ByteContent(content.getBytes()),
Expand Down
Loading