This issue discusses two proposals: "Online aggregation" and "Interactive Query Session Protocol" because they are very closely related to each other.
Online aggregation
Previously the "online aggregation" concept in the context of Druid was proposed as "incremental query results", but to avoid a clash with "incremental" term as used in IncrementalIndexes, and because it seems that "online aggregation" term is most often used to refer to this concept in scientific papers and elsewhere (possibly beginning from the Online aggregation paper by JM Hellerstein et al, 1997) it's suggested to stick to "online aggregation" from now on.
To facilitate human interactions with analytics and data exploration interfaces backed by Druid, and also to reduce percieved Druid's error rates online aggregation must be implemented on the level of Druid's Broker nodes. See The case for interactive data exploration accelerators (IDEAs) by A Crotty et el, 2016, and related research for some data and ideas related to this topic.
To help readers better understand online aggregation, I first describe "offline" aggregation, i. e. what is already implemented in Druid:
"Offline" (default) aggregation
Simple HTTP request-response between a Druid's Broker node and a client. (Possibly proxied through a Router node).
Broker determines all segments needed to compute results for the query and historical nodes on which those segments are loaded (except for segments for which results of the current query are found in cache). Broker sends requests to all those historicals to execute the query over all those segments. When historicals send the results back, Broker aggregates them all and send results back to the client (or a Router, if the request is proxied through it). This logic is implemented mainly in CachingClusteredClient class.
Online aggregation
Works on top of HTTP streaming or WebSockets connection between a Druid's Broker node and a client (possibly proxied through a Router node).
Broker determines all segments needed to compute results for the query and historicals on which those segments are loaded, but may not send requests to historicals to execute the query over all those segments right away. Instead, if, for example, there are 10 partitions in all intervals covered by the query, broker sends requests to historical to execute the query over 3 segments in each interval (not necessarity with partition numbers 0, 1, and 2; they may be chosen randomly for each interval in separation). As soon as results for at least two segments in each interval arrive to Broker, Broker sends requests to historicals to execute the query over the next portion of segments in each interval. Broker aggregates incoming segment query results from historicals in background. From time to time Broker sends progressively more complete aggregation results (partial aggregation results) back to the client.
Fault tolerance: if some historical failed to return results for some segment (or timed out), Broker tries to send this request to another historical node on which the segment is loaded. It may also avoid sending more requests to that historical node for other segments in the course of this online aggregation session.
The reasoning and the principle behind the throttling of broker -> historical requests: a user interacting with the analytical interface may quickly narrow down the query or start executing another, entirely different query, not interested in the results of the current query anymore (by closing the browser tab, or going to "Home" in the interface). In this case an attempt to compute the most precise results with the maximum possible speed (bound by the throughput of the Druid cluster) will turn to be a waste of resources.
In the example above, Broker sends requests for 3 segments in each interval out of 10. A possible general principle is to not having more than 1 or 2 outstanding segment query executions on any historical at any time. This is because if historicals use HDDs or network-attached disks, and the segments don't appear in the page cache (reminder: they are all memory-mapped, but there is an idea to move away from that) more segment query executions will likely anyway queue up inside the historical, contending for I/O. So, that "1 or 2" (or more? configurable?) may depend on the recency of the interval, because more recent data is more likely to appear in the page cache.
Results format: along with ordinary aggregation results (on each iteration), Broker sends to the client the information of over what fraction of total data the current results are aggregated. It may also send estimated error / confidence intervals of the partial aggregation, if it is able to compute them for the given aggregation function, and if the user opts to recieve such data.
Broker also sends a flag indicating that the partial aggregation results may be skewed, e. g. if the partial aggragation contains no data from some interval(s) covered by the query (because of availability problems, slowness, errors, etc) or if there are disproportionate amounts of rows from different intervals it's not possible to adjust them for the aggregation function(s) that we compute, or if rows are partitined between segments within intervals based on some key rather than randomly.
When (how often) broker sends partial results back to the client: it may be a combination of the following factors:
- Time: send something (whatever new results Broker has since the previous sending) every X seconds. This is to ensure interactivity and feed the user with new information timely. X may increase since the beginning of the query. E. g. timeouts may be: 0.5 seconds since the beginning of the query, 1s, 2s, 4s, 6s, 8s, 12s, 16s, ... A sending is skipped if there is no new segment results aggregated on Broker since the previous sending.
- Readiness thresholds: send partial results to the client when we estimate that 90% (95? 99?) rows that should be covered by the query are already aggregated. It needs to fire only within the first 0.5 seconds since the beginning of the query execution (or whatever the first timeout is), to make the analytics interface to be percieved as even faster by the user. It probably shouldn't fire sooner than in 0.1 seconds since the beginning of the query execution, because it's where a user stops to notice difference.
Aggregation: for simple Sum and Count aggregators, interpolation of partial results to projected final values is pretty trivial, if historicals send the numbers of rows in segments and (for sum aggregators) the numbers of aggregated rows (i. e. Count) along with segment results themselves. For some other aggregators approximation of final results by partial results is non-trivial and may require using tools such as sketches (FYI @leerho, @edgan8) in Broker's memory, although neither the data nor the aggregation requested by the user has anything to do with sketches. This may be futher complicated by the requirement to estimate errors / condifence intervals of the partial aggregation results (see above).
However, even if during online aggregation approximation / projection of final results by partial results is not done at all and partial aggregation results are returned to the client as if it was all the data that needs to be aggregated, this is still much better than offline aggregation and in fact may not be noticeably worse than a sophisticated approximation algorithm from the user's perspective, because users may only be interested in the relative weights of different rows topN and groupBy results or a general trend in timeseries results, not the absolute values. Many visualisations even don't show users absolute values and let them only percieve relative weights (pie charts, bar charts).
JSON vs SQL query: it would be nice if online aggregation was agnostic to the query format and work with JSON as well as SQL queries.
Interactive Query Session Protocol
Interactive Query Session Protocol works on top of online aggregation. It doesn't seem that it provides much benefit without online aggregation implemented.
It may be implemented as an actual protocol on top of a single WebSocket connection, or just be a loose agreement about the expectations between clients and Druid nodes (Brokers, Routers), although each query is a separate HTTP Streaming / WebSocket interaction.
The idea is that human interaction with an analytics interface often appears to be a series of "drilling-down" queries: each following query is the previous query + more dimension filters. As soon as user starts the next query, he stops being interested in the previous query to be computed further, but it's likely that a user will return to that less selective query soon. There are also "sibling" query transitions on the same level of selectivity: e. g. a user moves from query with a filter dim1 = value1 to an otherwise identical query with a filter dim1 = value2, or dim1 IN {value1, value2}, or dim1 != value1.
Interactive Query Session Protocol (IQSP)
All queries within the human interaction session should end up on the same Broker. Unless IQSP is implemented as an actual protocol on top of WebSocket, user (analytics interface) need to make sure to query the same Broker within the session, or a Router node needs to identify a session heuristically or with help of some extra headers in requests and route all queries within a session to the same Broker.
As soon as the next query in the session arrives, broker stops online aggregation of the previous query, but doesn't drop the partial results and the query execution state until the end of the session. If a user returns to this query later in the session, broker will resume its execution from where it stopped.
After certain "sibling" transitions, broker may be able to provide some results to the client before querying any historical nodes, by manipulating the cached results of previous less selective queries and the previous "sibling" query. E. g. dim1 = value1 -> dim1 != value1 transition, and dim1 = value1 -> dim1 IN {value1, value2}, if dim1 = value2 already appeared earlier in the session.
This issue discusses two proposals: "Online aggregation" and "Interactive Query Session Protocol" because they are very closely related to each other.
Online aggregation
Previously the "online aggregation" concept in the context of Druid was proposed as "incremental query results", but to avoid a clash with "incremental" term as used in IncrementalIndexes, and because it seems that "online aggregation" term is most often used to refer to this concept in scientific papers and elsewhere (possibly beginning from the Online aggregation paper by JM Hellerstein et al, 1997) it's suggested to stick to "online aggregation" from now on.
To facilitate human interactions with analytics and data exploration interfaces backed by Druid, and also to reduce percieved Druid's error rates online aggregation must be implemented on the level of Druid's Broker nodes. See The case for interactive data exploration accelerators (IDEAs) by A Crotty et el, 2016, and related research for some data and ideas related to this topic.
To help readers better understand online aggregation, I first describe "offline" aggregation, i. e. what is already implemented in Druid:
"Offline" (default) aggregation
Simple HTTP request-response between a Druid's Broker node and a client. (Possibly proxied through a Router node).
Broker determines all segments needed to compute results for the query and historical nodes on which those segments are loaded (except for segments for which results of the current query are found in cache). Broker sends requests to all those historicals to execute the query over all those segments. When historicals send the results back, Broker aggregates them all and send results back to the client (or a Router, if the request is proxied through it). This logic is implemented mainly in
CachingClusteredClientclass.Online aggregation
Works on top of HTTP streaming or WebSockets connection between a Druid's Broker node and a client (possibly proxied through a Router node).
Broker determines all segments needed to compute results for the query and historicals on which those segments are loaded, but may not send requests to historicals to execute the query over all those segments right away. Instead, if, for example, there are 10 partitions in all intervals covered by the query, broker sends requests to historical to execute the query over 3 segments in each interval (not necessarity with partition numbers 0, 1, and 2; they may be chosen randomly for each interval in separation). As soon as results for at least two segments in each interval arrive to Broker, Broker sends requests to historicals to execute the query over the next portion of segments in each interval. Broker aggregates incoming segment query results from historicals in background. From time to time Broker sends progressively more complete aggregation results (partial aggregation results) back to the client.
Fault tolerance: if some historical failed to return results for some segment (or timed out), Broker tries to send this request to another historical node on which the segment is loaded. It may also avoid sending more requests to that historical node for other segments in the course of this online aggregation session.
The reasoning and the principle behind the throttling of broker -> historical requests: a user interacting with the analytical interface may quickly narrow down the query or start executing another, entirely different query, not interested in the results of the current query anymore (by closing the browser tab, or going to "Home" in the interface). In this case an attempt to compute the most precise results with the maximum possible speed (bound by the throughput of the Druid cluster) will turn to be a waste of resources.
In the example above, Broker sends requests for 3 segments in each interval out of 10. A possible general principle is to not having more than 1 or 2 outstanding segment query executions on any historical at any time. This is because if historicals use HDDs or network-attached disks, and the segments don't appear in the page cache (reminder: they are all memory-mapped, but there is an idea to move away from that) more segment query executions will likely anyway queue up inside the historical, contending for I/O. So, that "1 or 2" (or more? configurable?) may depend on the recency of the interval, because more recent data is more likely to appear in the page cache.
Results format: along with ordinary aggregation results (on each iteration), Broker sends to the client the information of over what fraction of total data the current results are aggregated. It may also send estimated error / confidence intervals of the partial aggregation, if it is able to compute them for the given aggregation function, and if the user opts to recieve such data.
Broker also sends a flag indicating that the partial aggregation results may be skewed, e. g. if the partial aggragation contains no data from some interval(s) covered by the query (because of availability problems, slowness, errors, etc) or if there are disproportionate amounts of rows from different intervals it's not possible to adjust them for the aggregation function(s) that we compute, or if rows are partitined between segments within intervals based on some key rather than randomly.
When (how often) broker sends partial results back to the client: it may be a combination of the following factors:
Aggregation: for simple Sum and Count aggregators, interpolation of partial results to projected final values is pretty trivial, if historicals send the numbers of rows in segments and (for sum aggregators) the numbers of aggregated rows (i. e. Count) along with segment results themselves. For some other aggregators approximation of final results by partial results is non-trivial and may require using tools such as sketches (FYI @leerho, @edgan8) in Broker's memory, although neither the data nor the aggregation requested by the user has anything to do with sketches. This may be futher complicated by the requirement to estimate errors / condifence intervals of the partial aggregation results (see above).
However, even if during online aggregation approximation / projection of final results by partial results is not done at all and partial aggregation results are returned to the client as if it was all the data that needs to be aggregated, this is still much better than offline aggregation and in fact may not be noticeably worse than a sophisticated approximation algorithm from the user's perspective, because users may only be interested in the relative weights of different rows topN and groupBy results or a general trend in timeseries results, not the absolute values. Many visualisations even don't show users absolute values and let them only percieve relative weights (pie charts, bar charts).
JSON vs SQL query: it would be nice if online aggregation was agnostic to the query format and work with JSON as well as SQL queries.
Interactive Query Session Protocol
Interactive Query Session Protocol works on top of online aggregation. It doesn't seem that it provides much benefit without online aggregation implemented.
It may be implemented as an actual protocol on top of a single WebSocket connection, or just be a loose agreement about the expectations between clients and Druid nodes (Brokers, Routers), although each query is a separate HTTP Streaming / WebSocket interaction.
The idea is that human interaction with an analytics interface often appears to be a series of "drilling-down" queries: each following query is the previous query + more dimension filters. As soon as user starts the next query, he stops being interested in the previous query to be computed further, but it's likely that a user will return to that less selective query soon. There are also "sibling" query transitions on the same level of selectivity: e. g. a user moves from query with a filter
dim1 = value1to an otherwise identical query with a filterdim1 = value2, ordim1 IN {value1, value2}, ordim1 != value1.Interactive Query Session Protocol (IQSP)
All queries within the human interaction session should end up on the same Broker. Unless IQSP is implemented as an actual protocol on top of WebSocket, user (analytics interface) need to make sure to query the same Broker within the session, or a Router node needs to identify a session heuristically or with help of some extra headers in requests and route all queries within a session to the same Broker.
As soon as the next query in the session arrives, broker stops online aggregation of the previous query, but doesn't drop the partial results and the query execution state until the end of the session. If a user returns to this query later in the session, broker will resume its execution from where it stopped.
After certain "sibling" transitions, broker may be able to provide some results to the client before querying any historical nodes, by manipulating the cached results of previous less selective queries and the previous "sibling" query. E. g.
dim1 = value1->dim1 != value1transition, anddim1 = value1->dim1 IN {value1, value2}, ifdim1 = value2already appeared earlier in the session.