Time Ordering Option on Small-Result-Set Scan Queries#7024
Time Ordering Option on Small-Result-Set Scan Queries#7024justinborromeo wants to merge 49 commits intoapache:masterfrom
Conversation
wasn't returning elements in correct order
set > threshold limit
| private int limit; | ||
|
|
||
| @Param({"none", "descending", "ascending"}) | ||
| private static String timeOrdering; |
There was a problem hiding this comment.
Any reason this is static when the others are not? If not, please adjust it to be in line with the others.
There was a problem hiding this comment.
The basicA(), basicB()... query builder functions are static (like in SearchBenchmark). Since timeOrdering is used when building the query, it also needs to be static.
| query is that the Scan query does not retain all the returned rows in memory before they are returned to the client | ||
| (except when time-ordering is used). The Select query _will_ retain the rows in memory, causing memory pressure if too | ||
| many rows are returned. The Scan query can return all the rows without issuing another pagination query, which is | ||
| extremely useful when directly querying against historical or realtime nodes. |
There was a problem hiding this comment.
We're trying to harmonize language in this area (see #6916); in that context "Historical processes or streaming ingestion tasks" is more Ministry of Truth approved than "historical or realtime nodes". For clarification I'd add something to call out expected usage. One way to tie it all together is:
In addition to straightforward usage where a Scan query is issued to the Broker, the Scan query can also be issued directly to Historical processes or streaming ingestion tasks. This can be useful if you want to retrieve large amounts of data in parallel.
| return this; | ||
| } | ||
|
|
||
| public ScanQueryBuilder timeOrder(String timeOrder) |
There was a problem hiding this comment.
This'd be better as an enum, like Direction in OrderByColumnSpec. It reduces the likelihood of bugs since it makes invalid values impossible. (And as a minor side benefit, will take up less memory.)
|
|
||
| public static final String TIME_ORDER_ASCENDING = "ascending"; | ||
| public static final String TIME_ORDER_DESCENDING = "descending"; | ||
| public static final String TIME_ORDER_NONE = "none"; |
There was a problem hiding this comment.
You'll be able to get rid of these after changing timeOrder to an enum.
There was a problem hiding this comment.
Does it make sense to make result format an enum too?
| return this; | ||
| } | ||
|
|
||
| // int should suffice here because no one should be sorting greater than 2B rows in memory |
There was a problem hiding this comment.
Also, Java collections can't store more than Integer.MAX_VALUE items anyway.
|
|
||
| /** | ||
| * This iterator supports iteration through any Iterable of unbatched ScanResultValues (1 event/SRV) and aggregates | ||
| * events into ScanResultValues with {int batchSize} events. The columns from the first event per ScanResultValue |
There was a problem hiding this comment.
{@code batchSize} is more javadoc-y.
| ScanResultValue srv = itr.next(); | ||
| // Only replace once using the columns from the first event | ||
| columns = columns.isEmpty() ? srv.getColumns() : columns; | ||
| eventsToAdd.add(((List) srv.getEvents()).get(0)); |
There was a problem hiding this comment.
If it is a precondition that srv.getEvents() should only have one element, use Iterables.getOnlyElement((List) srv.getEvents()) instead, which will throw an exception if the precondition is not satisfied. Just doing .get(0) could mask potential bugs.
| columns = columns.isEmpty() ? srv.getColumns() : columns; | ||
| eventsToAdd.add(((List) srv.getEvents()).get(0)); | ||
| } | ||
| return new ScanResultValue(null, columns, eventsToAdd); |
There was a problem hiding this comment.
Please mark String segmentId (and getSegmentId) as @Nullable in ScanResultValue.
| * events into ScanResultValues with {int batchSize} events. The columns from the first event per ScanResultValue | ||
| * will be used to populate the column section. | ||
| */ | ||
| private static class ScanBatchedTimeOrderedIterator implements CloseableIterator<ScanResultValue> |
There was a problem hiding this comment.
This doesn't seem to be doing anything related to time-ordering; why include "TimeOrdered" in the name?
| } else if ((scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_ASCENDING) || | ||
| scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) | ||
| && scanQuery.getLimit() <= scanQueryConfig.getMaxRowsTimeOrderedInMemory()) { | ||
| Iterator<ScanResultValue> scanResultIterator = scanQueryLimitRowIteratorMaker.make(); |
There was a problem hiding this comment.
This looks like it will apply the limit before sorting takes place. Am I reading it right? It needs to be the other way around, otherwise we won't be guaranteed to get the very earliest or very latest rows.
Also, for a Scan query with a limit, in order to make sure we read the very earliest or very latest rows, we need to make sure to iterate through segments in the right order. Otherwise we are going to need to read the first (or last) limit rows out of every segment to be sure we got the right ones, which we want to avoid. The idea should be to read segments in ascending (or descending) order, and stop reading once we know that no as-yet-unread segments could possibly offer us any earlier (or later) events, based on those segments' data intervals.
There was a problem hiding this comment.
Ah shoot, I misunderstood part of the issue. I wrote this under the assumption that it would act like an ORDER BY operator. So if I understand correctly now, this query:
{
...
"timeOrdering":"descending"
"limit":100
}
should return the latest 100 rows?
Conversely, this query:
{
...
"timeOrdering":"ascending"
"limit":100
}
should return the earliest 100 rows?
There was a problem hiding this comment.
Yes, that's correct. It is how ORDER BY works in SQL, too (ordering happens before limiting).
| final Deque<ScanResultValue> sortedElements = new ArrayDeque<>(q.size()); | ||
| while (q.size() != 0) { | ||
| // We add at the front of the list because poll removes the tail of the queue. | ||
| sortedElements.addFirst(q.poll()); |
There was a problem hiding this comment.
Just want to double check but ArrayDeque#addFirst() is O(1), right? Initially used a LinkedList but Forbidden APIs said no.
There was a problem hiding this comment.
This is the implementation:
public void addFirst(E e) {
if (e == null)
throw new NullPointerException();
elements[head = (head - 1) & (elements.length - 1)] = e;
if (head == tail)
doubleCapacity();
}It looks O(1) on an amortized basis to me. Most of the function is O(1), except for doubleCapacity(), which is O(n) but it runs at most once every n additions.
|
Closing in favour of the refactored PR: #7133. The updated PR reflects the suggestions in this PR. |
This PR addresses point 2 of issue #6088 . For scan queries where the limit is less than a configurable threshold (default 100K), users have the option to time-order their results (either ascending or descending). Users also have the option to skip time-ordering by setting
timeOrdertonone, in which case the existing implementation of scan will run. There's a limit on the number of records that can be ordered because this ordering currently occurs in memory. There are plans to include on-disk sorting for larger result sets in the future but those changes are outside of the scope of this PR.