Optimize MSQ realtime queries#15399
Conversation
| queue.offer( | ||
| InputStreamHolder.fromStream( | ||
| new InputStream() | ||
| { | ||
| @Override | ||
| public int read() throws IOException | ||
| { | ||
| if (th != null) { | ||
| throw new IOException(msg, th); | ||
| } else { | ||
| throw new IOException(msg); | ||
| } | ||
| } | ||
| }, | ||
| -1, | ||
| 0 | ||
| ) | ||
| ); |
Check notice
Code scanning / CodeQL
Ignored error status of call
cryptoe
left a comment
There was a problem hiding this comment.
Overall LGTM. Left some comments.
| // No cursors! | ||
| cursorYielder.close(); | ||
| return ReturnOrAwait.returnObject(Unit.instance()); | ||
| return ReturnOrAwait.returnObject(handedOffSegments); |
There was a problem hiding this comment.
If there are not handed of segments nulls will be passed. Is that intended ?
| @Override | ||
| public long getWeight() | ||
| { | ||
| return segments.size() * DATA_SERVER_WEIGHT_ESTIMATION; |
There was a problem hiding this comment.
We are estimating the number of rows to be 5000 for realtime segments rite ?
There was a problem hiding this comment.
Yes, added a comment
| } | ||
| List<InputSlice> handedOffSegments = new ArrayList<>(); | ||
| for (Object o : objects) { | ||
| if (o instanceof SegmentsInputSlice) { |
There was a problem hiding this comment.
Added a null check
| } | ||
|
|
||
| private void onFirstProcessorComplete(final Object firstResult) | ||
| private synchronized void checkFirstProcessorComplete() |
There was a problem hiding this comment.
Please add documentation regarding which threads need this.
| * Manager that chains processors: runs all processors generated by {@link #first} first, then based on its result, | ||
| * creates {@link #restFuture} using {@link #restFactory} and runs that next. | ||
| */ | ||
| public class ChainedProcessorManager<A, B, R> implements ProcessorManager<Object, R> |
There was a problem hiding this comment.
@gianm Can you please vet the changes in this class.
| @JsonProperty("dataSource") String dataSource, | ||
| @JsonProperty("segments") List<RichSegmentDescriptor> descriptors | ||
| @JsonProperty("segments") List<RichSegmentDescriptor> descriptors, | ||
| @JsonProperty("servedSegments") List<DataServerRequestDescriptor> servedSegments |
There was a problem hiding this comment.
This should be marked nullable for rolling upgrade cases.
Major changes
Currently, while reading results from realtime tasks, requests are sent on a segment level. This is slightly wasteful, as when contacting a data servers, it is possible to transfer results for all segments which it is hosting, instead of only one segment at a time.
One change this PR makes is to group the segments on the basis of servers. This reduces the number of queries to data servers made. Since we don't have access to the number of rows for realtime segments, the grouping is done with a fixed estimated number of rows for each realtime segment.
If the segments have already been handed off, the earlier approach made was to fetch the segments from deep storage.
After the above changes, a single worker processor thread might have to fetch all segments from deepstorage, missing an oppurtunity for parellelism, since each segment fetch from deepstorage, is a separate operation. This PR also edits the ChainedProcessorManager. The initial BaseLeafFrameProcessors return the list of segments which were handed off. The ProcessorManager then uses this information to create new list of frame processors, handle the newer handed off segments.
This PR has: