Add druid-multi-stage-query extension.#12918
Conversation
32c86f7 to
6d6223e
Compare
|
This pull request introduces 4 alerts when merging 6d6223e into 7fb1153 - view on LGTM.com new alerts:
|
|
This pull request introduces 4 alerts when merging 728ba30 into 7fb1153 - view on LGTM.com new alerts:
|
The useless null check is fixed in 728ba30. The uncontrolled data in path expressions seems to be due to using task IDs in file paths. However, these are safe because task IDs are validated prior to submission (using |
|
This pull request introduces 4 alerts when merging b0f7496 into 7fb1153 - view on LGTM.com new alerts:
|
clintropolis
left a comment
There was a problem hiding this comment.
🤘
I was going to only do a high-level review since this thing is huge, but ended up doing more of a mid-level review to see how things fit together and a quick skim of all non-test code. I did not review tests at all.
I pulled the branch and did some testing and things were working as expected. Very nice work 🎉
|
|
||
| final int outstandingProcessors; | ||
|
|
||
| if (hasParquet(inputSlices)) { |
There was a problem hiding this comment.
this one stuck out a bit while i was scanning... any thoughts on a better way to do this in the future? maybe some mechanism for an input format to indicate how much data its going to read or something?
There was a problem hiding this comment.
A couple thoughts:
- We could provide a way for InputFormats to declare how much memory they expect to use, and size the number of concurrent processors accordingly.
- AFAIK, this is only a problem with Parquet. We could use a different Parquet reader, or write our own, such that this extra memory use doesn't happen. For example we could memory-map the Parquet files, instead of loading them up into the heap as the current reader does.
| // Determine the max number of workers in JVM for memory allocations. | ||
| if (toolbox.getAppenderatorsManager() instanceof UnifiedIndexerAppenderatorsManager) { | ||
| // CliIndexer | ||
| numWorkersInJvm = injector.getInstance(WorkerConfig.class).getCapacity(); |
There was a problem hiding this comment.
i wonder if longer term this makes sense to be its own config? sort of seems to be giving a double meaning, though close enough in purpose that is fine
There was a problem hiding this comment.
This code is using the appropriate property: it's trying to figure out how many tasks could run in the same JVM, which on Indexers is controlled by druid.worker.capacity and on MMs is always 1. So a new config property isn't needed. However, a new injectable object (or a new method on an existing object) would remove the need for an instanceof here.
paul-rogers
left a comment
There was a problem hiding this comment.
@gianm, this is an impressive bit of work! It is also a large chunk of code, so submitting a partial review concurrent with continued review.
| private final CounterSnapshotsTree taskCountersForLiveReports = new CounterSnapshotsTree(); | ||
|
|
||
| // For live reports. stage number -> stage phase | ||
| private final ConcurrentHashMap<Integer, ControllerStagePhase> stagePhasesForLiveReports = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
What is the concurrency model here? Thanks for the great intro notes which explains that the controller is a state machine for running the execution DAG. There are multiple sources of input: the "main" routine (the controller task), events from workers, and (perhaps) events from the execution manager (i.e. Overlord) if it detects worker failure.
A typical way to handle such a state machine is that each input is handled as an event: the entire state machine is locked, the event is processed, the state machine is unlocked, and the cycle repeats. Where the event is isolated (worker update, say), then only that one worker is locked.
Here, however, it looks like each field has its own concurrency mechanism via the concurrent hash maps, etc.
Is the model that events are not atomic, but rather the steps within the event are atomic? If so, does this introduce race conditions as variable A is updated unsynchronized with B? Are the variables here defined in a way that they can be independent and thus updated in any order? Or, should we enforce update order via locks on the whole controller? If controller-wide synchronization, then do the variables need their own synchronization?
Looks like the Kernel has an event queue, which makes sense. Should the synchronized items here be part of the Kernel so they are similarly synchronized?
Does the Kernel update any of these fields? If so, do we have to worry about deadlock between the Kernel and fields here?
There was a problem hiding this comment.
The concurrency model of the ControllerQueryKernel itself is that it is not thread safe, and is meant to be used by a single thread fed by an event queue. That thread and event queue resides in ControllerImpl.
The various concurrent maps here enable live reports: they're updated by the main kernel thread, and read by http threads that serve live reports. There isn't a risk of deadlock or races or anything like that, because of the single-writer, multiple-readers situation. The comment "For live reports" is present on all fields that work this way. I admit that this phrasing is a bit terse. I'll expand the comments so it's clearer what is going on.
There was a problem hiding this comment.
Perhaps, in the future, live reports can be pulled out into a separate class to make the semantics clear (and simplify the large controller class.)
| ); | ||
| } | ||
|
|
||
| public TaskStatus runTask(final Closer closer) |
There was a problem hiding this comment.
Consideration for the future: this is a complex function, and is quite large. It uses a pile of variables to keep track of state. It might be easier to understand (and test) if the function were converted to an inner class, with the local variables as members, and the various "stanzas" of functionality as methods. The computer doesn't care, but the class approach might make it easier for us poor humans to understand.
Some parts are broken out, such as initializeQueryDefAndState and the query run.
The thought is that execution seems to have three parts, and it might be clearer to make that structure clearer via class structure:
- Plan: convert native query to MSQ execution plan
- Run: launch workers, monitor, gather results, publish segments
- Report: do the paperwork to record what actually happened
Reporting could be a separate class (as was recently done in the SQL statement refactoring). Planning could be separate abstraction (as in the SQL layer), leaving execution to be encapsulated in its own dedicated abstraction. Reporting might be passed to all three steps: gathered in the first two steps, emitted in the last step.
There was a problem hiding this comment.
Sounds reasonable. For now, I'll at least add comments making it clearer which code is involved in planning, running, and reporting.
| */ | ||
| private void addToKernelManipulationQueue(Consumer<ControllerQueryKernel> kernelConsumer) | ||
| { | ||
| if (!kernelManipulationQueue.offer(kernelConsumer)) { |
There was a problem hiding this comment.
Should the kernel maintain its own queue? Or, should there be a "kernel gateway" that holds onto the kernel and its event queue?
There was a problem hiding this comment.
The "kernel gateway" is the main controller thread. I'll add comments (javadocs on the kernelManipulationQueue field and the addToKernelManipulationQueue method, probably) that make it clearer how the concurrency works here.
| // The following logic assumes that output of all the stages without a shuffle retain the partition boundaries | ||
| // of the input to that stage. This may not always be the case. For example GROUP BY queries without an ORDER BY | ||
| // clause. This works for QueryKit generated queries uptil now, but it should be reworked as it might not | ||
| // always be the case |
There was a problem hiding this comment.
+1 for the comments. They greatly help us reviewers understand the intent of the code.
A minor suggestion would be to label the purpose of each stanza. For example, this stanza does "shuffle assignment"?
There was a problem hiding this comment.
This block of code is dedicated to allocating the segment IDs that will be used as part of an INSERT / REPLACE. The comment at the top of the block reads:
Allocate segments, if this is the final stage of an ingestion.
However, it's a good chunk of code (45 lines, including comments) and that comment at the top may be easy to miss. Let me see if it's practical to extract this logic into its own method.
| final QueryDefinition queryDef, | ||
| final InputSpecSlicerFactory inputSpecSlicerFactory, | ||
| final Closer closer | ||
| ) throws Exception |
There was a problem hiding this comment.
The key concept here is to run a plan, returning when done. Presumably the "main" task thread that calls this will wait for completion. Does it do anything else while waiting? Probably not: all the action from RPC messages?
If so, it would be handy to encapsulate this concept via a "run" method (on that runner class mentioned above), and handle the synchronization internally. The method returns with either a success or failure. In either case, all cleanup will have been done. That approach makes the separation of concerns just a bit clearer.
There was a problem hiding this comment.
This logic runs in the main task thread and blocks until the query is complete. It doesn't do much (most of the action does happen via RPC) but it does do something (mainly, kick things off and then wait for updates to come in via kernelManipulationQueue). I'm not sure what this means for your suggestion of how to reorganize things to be clearer. Are you suggesting changing a particular method's return type or adding a new method?
| { | ||
| return MultiStageQueryContext.isFinalizeAggregations(query.getQueryContext()) == false | ||
| && query.getContextBoolean(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, true) == false | ||
| && query instanceof GroupByQuery; |
There was a problem hiding this comment.
Nit:
return query instanceof GroupByQuery
&& !MultiStageQueryContext.isFinalizeAggregations(query.getQueryContext())
&&
!query.getContextBoolean(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, true);I read this as, "rollup uses a group by query. That query must not finalize aggregations or unnest multi-values." Is unclear to this newbie what it means to finalize aggregations and then unnest; does the SQL syntax support this subtlety?
There was a problem hiding this comment.
Rewrote it in that way.
Also added this javadoc that should hopefully make the finalization and unnesting requirements more clear:
/**
* Whether a native query represents an ingestion with rollup.
*
* Checks for three things:
*
* - The query must be a {@link GroupByQuery}, because rollup requires columns to be split into dimensions and
* aggregations.
* - The query must not finalize aggregations, because rollup requires inserting the intermediate type of
* complex aggregations, not the finalized type. (So further rollup is possible.)
* - The query must explicitly disable {@link GroupByQueryConfig#CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING}, because
* groupBy on multi-value dimensions implicitly unnests, which is not desired behavior for rollup at ingestion time.
*/
| final RowSignature signature, | ||
| final ClusterBy clusterBy, | ||
| final ColumnMappings columnMappings | ||
| ) |
There was a problem hiding this comment.
So, I'm reading now about creating the query "plan". In the same class, I read about query execution and results resporting, as well as event handling.
I wonder, does Overlord require that all logic for a task be in a single class? Probably not? That suggestion above about dividing up the logic would help us poor reviews avoid the need to keep so many factors in mind in a single class.
There was a problem hiding this comment.
I wonder, does Overlord require that all logic for a task be in a single class?
No, it doesn't, of course. It's already split up to some degree: all of the querykit and kernel packages are bits of logic ultimately used by this ControllerImpl. But I agree it'd be good to split up this class further. My preference would be to do it in follow-ups. Probably the first thing to tackle would be to isolate all the ingestion-related planning into its own class (i.e. what locks to acquire upfront, whether or not the ingestion is rollup-based, what the segment schema is going to be, etc). That would simplify the main class quite a bit.
| return shardColumns; | ||
| } | ||
|
|
||
| private static boolean isClusterByBoosted(final ClusterBy clusterBy) |
There was a problem hiding this comment.
Maybe a note about what "boosted" means here?
| final Set<String> outputColumnsInOrder = new LinkedHashSet<>(segmentSortOrder); | ||
|
|
||
| // Then the query-level CLUSTERED BY. | ||
| // Note: this doesn't work when CLUSTERED BY specifies an expression that is not being selected. |
There was a problem hiding this comment.
Does the SQL layer prevent this state of affairs? The case only occurs in tasks submitted outside of SQL?
There was a problem hiding this comment.
IIRC, the SQL layer allows it. In this case the expression is still used for partitioning data between segments, but not for sorting data within a segment. In the catalog, it may make sense to expose a "segment sort by" for advanced users, to make this case clearer. (Today, CLUSTERED BY covers both partitioning and sorting, because 95% of the time people want the same key for both. So it would really be a 5%-of-the-time advanced users thing.)
There was a problem hiding this comment.
Extended the comment to spell this out:
// Then the query-level CLUSTERED BY.
// Note: this doesn't work when CLUSTERED BY specifies an expression that is not being selected.
// Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows
// within an individual segment.
|
|
||
| Map<String, AggregatorFactory> outputColumnAggregatorFactories = new HashMap<>(); | ||
|
|
||
| if (isRollupQuery) { |
There was a problem hiding this comment.
Comment to say why rollup is special here?
There was a problem hiding this comment.
Added:
Populate aggregators from the native query when doing an ingest in rollup mode.
paul-rogers
left a comment
There was a problem hiding this comment.
Overall, looks excellent. Made a quick pass over the core bits, skipped the many "support" classes. Nothing is a show-stopper. The comments are either nits (which can be done later) or architectural suggestions (which are inherently issues for later.) So, the PR seems fine as-is.
LGTM (non-binding)
| } | ||
|
|
||
| /** | ||
| * Returns a controller task ID given a |
There was a problem hiding this comment.
Nit: "given a"... what?
There was a problem hiding this comment.
Oops. Should say SQL query id.
| } else { | ||
| // Normally we expect the SQL layer to validate that __time for INSERT is a TIMESTAMP type, which would | ||
| // be a long at execution time. So a nice user-friendly message isn't needed here: it would only happen | ||
| // if the SQL layer is bypassed. Nice, friendly users wouldn't do that :) |
There was a problem hiding this comment.
This level of "behind the scenes" explanation is super helpful!
| return Pair.of(new QueryableIndexSegment(index, dataSegment.getId()), closer); | ||
| } | ||
| catch (IOException | SegmentLoadingException e) { | ||
| throw new RuntimeException(e); |
There was a problem hiding this comment.
Should we include in the error information about the segment that failed to download? And, differentiate between the coordinator failure and the download failure?
There was a problem hiding this comment.
Good idea. I added the segment ID to the error message, and split this up into two try so we can have different error messages for the two things that can fail.
| private volatile ControllerClient controllerClient; | ||
| private volatile WorkerClient workerClient; | ||
| private volatile Bouncer processorBouncer; | ||
| private volatile boolean controllerAlive = true; |
There was a problem hiding this comment.
Since these are marked volatile, it means that we have to worry about concurrent reads and writes. These, however, seem fundamental. Is it true that, say, a worker starts running and these values are then concurrently set a some random time later? If so, should there be some state or locking to enforce ordering? (I.e. don't do "real work" until the fundamentals are provided.)
Or, is it the case that there is an implied state machine: that these are set in one thread (say, via a work order or some such?), and only then does another thread consume them? If so, do they need to be volatile? What is the sync mechanism?
Reading further, it appears these are set in runTask based on the context. Yet, the context is available in the constructor. Should at least the selfDruidNode be set in the constructor and be final?
There was a problem hiding this comment.
I'll move selfDruidNode and processorBouncer to the constructor. They can both be there.
The clients workerClient and controllerClient are set when a worker starts running and are never re-assigned. They're then used by processing threads so the workers can contact each other during the shuffle. I'll add comments about this:
/**
* Set once in {@link #runTask} and never reassigned.
*/
private volatile ControllerClient controllerClient;
/**
* Set once in {@link #runTask} and never reassigned. Used by processing threads so we can contact other workers
* during a shuffle.
*/
private volatile WorkerClient workerClient;
/**
* Set to false by {@link #controllerFailed()} as a way of enticing the {@link #runTask} method to exit promptly.
*/
private volatile boolean controllerAlive = true;
| } | ||
| } | ||
|
|
||
| private ListeningExecutorService makeProcessingPool() |
There was a problem hiding this comment.
For us newbies, maybe add a note to explain what the processing pool is. Or, point to where this is explained.
There was a problem hiding this comment.
Added this note:
/**
* Decorates the server-wide {@link QueryProcessingPool} such that any Callables and Runnables, not just
* {@link PrioritizedCallable} and {@link PrioritizedRunnable}, may be added to it.
*
* In production, the underlying {@link QueryProcessingPool} pool is set up by
* {@link org.apache.druid.guice.DruidProcessingModule}.
*/
| * | ||
| * May be zero for some kinds of slices, even if they contain data, if the input is not file-based. | ||
| */ | ||
| int numFiles(); |
There was a problem hiding this comment.
Nit: fileCount reads just a bit better than numFiles.
| } | ||
|
|
||
| /** | ||
| * Adds stageIds for those stages which donot require any input from any other stages |
There was a problem hiding this comment.
Nit "donot" -> "do not"
| */ | ||
| // No ordering, but there is a limit or an offset. These work by funneling everything through a single partition. | ||
| // So there is no point in forcing any particular partitioning. Since everything is funnelled into a single | ||
| // partition without a ClusterBy, we donot need to necessarily create it via the resultShuffleSpecFactory provided |
There was a problem hiding this comment.
Nit: "donot" -> "do not"
| import java.util.List; | ||
| import java.util.Optional; | ||
|
|
||
| public class GroupByQueryKit implements QueryKit<GroupByQuery> |
There was a problem hiding this comment.
A general comment on this approach is that we seem to be mapping from one big query definition (the native query) to another (the MSQ query definition.) Other projects have had much success decomposing a query plan into isolated functional units (often called "operators") that feed into one another via a pipeline. A future direction here might be to do the same: have a frame processor hold a pipeline of operators for scan, sort, merge, limit, offset, etc. Might make the logic a bit more flexible and reusable.
There was a problem hiding this comment.
I agree this is an excellent direction to go in.
|
I have made a preview build of Druid 24 that includes #12918 and #12919 if you want to play with it and help test. You can get it by running: |
|
If you want to see this PR in action I made a video based on the above build: https://www.youtube.com/watch?v=xUzYuPfzARk (make sure to watch in the highest available quality) |
| @JsonProperty("status") TaskState status, | ||
| @JsonProperty("error") @Nullable MSQErrorReport errorReport, | ||
| @JsonProperty("warnings") Queue<MSQErrorReport> warningReports, | ||
| @JsonProperty("startTime") @Nullable DateTime startTime, |
There was a problem hiding this comment.
nit: This is annotated as Nullable, however, there is a Preconditions check for notNull on the same.
There was a problem hiding this comment.
Good point. I removed the precondition check. The intent is this field may be null if a query has not started executing yet.
| import org.apache.druid.indexing.common.TaskReport; | ||
|
|
||
| @JsonTypeName(MSQTaskReport.REPORT_KEY) | ||
| public class MSQTaskReport implements TaskReport |
There was a problem hiding this comment.
For this class, I think we would require a type field as well for allowing deserialization in the clients. Reference comment for the same: IngestionStatsAndErrorsTaskReport#L94.
There was a problem hiding this comment.
Hmm. That seems like a funky hack. I'd hate to have to replicate that here. Let me look into why it's needed and see if there's a better solution.
| return new MSQStagesReport(stages); | ||
| } | ||
|
|
||
| @JsonValue |
There was a problem hiding this comment.
Having JsonValue here causes the MSQStagesReport to be serialized as an array, instead of an object. This is causing a serde error in the MSQTaskReportPayload.
I wrote a simple test case to verify this:
MSQTaskReport msqTaskReport = new MSQTaskReport(
"test-id",
new MSQTaskReportPayload(
new MSQStatusReport(
TaskState.RUNNING,
null,
new ArrayDeque<>(),
DateTime.now(),
1
),
new MSQStagesReport(ImmutableList.of()),
null,
null
)
);
String serialized = jsonMapper.writeValueAsString(msqTaskReport);
MSQTaskReport deserializEd = jsonMapper.readValue(serialized, MSQTaskReport.class);
The above snipped is failing.
The msqTaskReport serializes as:
......"stages":[{"stageNumber":0,"definition":{"id":......
instead of
......"stages":{stages:[{"stageNumber":0,"definition":{"id":......
If we change the serialization by converting JsonValue to JsonProperty("stages") this should work fine, however, the downstream consumers (web-console) might fail, so we can defer this change post the PR merges. Another option would be to change the deserialization instead in the MSQTaskReportPayload/MSQStagesReport (which I am unsure how to achieve). WDYT? Jackson is not my strong suit so LMK in case I missed any obvious way to achieve serde.
There was a problem hiding this comment.
Ah. Good point. The way to do this:
Another option would be to change the deserialization instead in the MSQTaskReportPayload/MSQStagesReport
Would be to remove the @JsonProperty annotation from the constructor field. This causes it to deserialize directly from a list. I'll make that change, and add deserialization tests.
|
This pull request introduces 4 alerts when merging 9f9fcc4 into 6fec1d4 - view on LGTM.com new alerts:
|
|
Thanks for reviewing, @paul-rogers @LakshSingla @abhishekagarwal87 & @clintropolis! I've pushed a new commit that addresses your collective comments. Please let me know if you have further comments. |
|
@gianm, thanks for the explanations and updated comments. I agree that most of my comments are better addressed via incremental PRs as the team works with this code. Thanks again for the this big push forward. |
|
This pull request introduces 4 alerts when merging 337a75c into 6fec1d4 - view on LGTM.com new alerts:
|
|
I'm looking at the LGTM issues. The commit 277a5e8 removes the useless null check (it was a stageId check in ControllerImpl). It also adds additional validation to task IDs to attempt to satisfy the check for uncontrolled data in a path expression. As I mentioned earlier, these IDs are already checked, so I don't think there's really a problem. The changes are about trying to make sure LGTM sees it the same way. |
|
This pull request introduces 6 alerts when merging 990767b into f0fc450 - view on LGTM.com new alerts:
|
How are there more? Looking at the alerts LGTM raises, I don't think it understands that |
|
I reviewed the LGTM alerts and they look OK to me. I don't think suppression is the way to go, since I wouldn't want to add a suppression at the end of the path. (The LGTM check is looking for user-provided data that makes its way into a file path; in these six cases, it's all task IDs, which are enforced as being path-safe.) So the best thing to do seems to be to merge the patch without suppressing the alerts. |
Yeah, I guess it doesn't like the validation method changing from void to returning a thing, and confused some stuff? It seems like these places should all be safe though, so I think can ignore them. |
|
Here we go then. |
Functionality
This patch adds the final pieces to be able to run SQL SELECT, INSERT, and REPLACE statements as multi-stage queries using indexing service tasks. This is enough to provide an initial implementation of #12262.
The functionality in this patch is provided as an extension. When the extension is loaded:
/druid/v2/sql/taskis added, and accepts SQL SELECT, INSERT, and REPLACE statements. The EXTERN function is usable through this endpoint.query_controllerandquery_workerare added. These provide the runtime for SQL queries submitted to the new endpoint.Notes for reviewers
Developer notes are at: https://github.com/gianm/druid/blob/multi-stage-query-tasks/extensions-core/multi-stage-query/README.md. Please refer to this doc for a description of the key classes and concepts.
Try this at home
You can try this out yourself. First, build Druid from this branch and run
bin/start-micro-quickstart. Then, save the following content toinsert.json. (Human-readable version: https://gist.github.com/gianm/9183b1fdecdb3e9894a344c40bcb7d4a).{ "query": "REPLACE INTO \"kttm_simple\"\nOVERWRITE ALL\nWITH kttm_data AS (\nSELECT * FROM TABLE(\n EXTERN(\n '{\"type\":\"http\",\"uris\":[\"https://static.imply.io/data/kttm/kttm-v2-2019-08-25.json.gz\"]}',\n '{\"type\":\"json\"}',\n '[{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"agent_category\",\"type\":\"string\"},{\"name\":\"agent_type\",\"type\":\"string\"},{\"name\":\"browser\",\"type\":\"string\"},{\"name\":\"browser_version\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"continent\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"event_type\",\"type\":\"string\"},{\"name\":\"event_subtype\",\"type\":\"string\"},{\"name\":\"loaded_image\",\"type\":\"string\"},{\"name\":\"adblock_list\",\"type\":\"string\"},{\"name\":\"forwarded_for\",\"type\":\"string\"},{\"name\":\"language\",\"type\":\"string\"},{\"name\":\"number\",\"type\":\"long\"},{\"name\":\"os\",\"type\":\"string\"},{\"name\":\"path\",\"type\":\"string\"},{\"name\":\"platform\",\"type\":\"string\"},{\"name\":\"referrer\",\"type\":\"string\"},{\"name\":\"referrer_host\",\"type\":\"string\"},{\"name\":\"region\",\"type\":\"string\"},{\"name\":\"remote_address\",\"type\":\"string\"},{\"name\":\"screen\",\"type\":\"string\"},{\"name\":\"session\",\"type\":\"string\"},{\"name\":\"session_length\",\"type\":\"long\"},{\"name\":\"timezone\",\"type\":\"string\"},{\"name\":\"timezone_offset\",\"type\":\"long\"},{\"name\":\"window\",\"type\":\"string\"}]'\n )\n))\n\nSELECT\n FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n session,\n agent_category,\n agent_type,\n browser,\n browser_version,\n MV_TO_ARRAY(\"language\") AS \"language\",\n os,\n city,\n country,\n forwarded_for AS ip_address\nFROM kttm_data\nWHERE os = 'iOS'\nPARTITIONED BY HOUR\nCLUSTERED BY browser, session" }Run the curl command:
You'll get a task ID back. When the task finishes, the datasource
kttm_simplewill have been created.Current state of the extension
Future PRs (by other people 🙂) will add a web console UI and documentation, rounding out the feature.
Once this patch and the web console and docs changes are in place, I believe SQL INSERT and REPLACE functionality would be ready for prime time with one caveat. Fault tolerance isn't yet implemented, so if one worker task fails, the entire query fails. I think this is OK for many batch ingest use cases, but of course isn't OK for all of them. Fault tolerance is one of the areas we'll want to work on next.
SQL SELECT functionality has a couple of additional caveats, beyond the lack of fault tolerance. One is that results are stored in the task report, which isn't partitioned, so it becomes a bottleneck for very large resultsets. Another is that there is no caching for Druid segments, sot hey must be downloaded anew on each query. But the SELECT functionality does work! Try it; it's fun. I like doing COUNT DISTINCT queries with
useApproximateCountDistinct: falseand topN-style queries withuseApproximateTopN: falseon high-cardinality columns. These queries generate a ton of intermediate data, and are a good exercise for the multi-stage system.This patch was developed at Imply, partially by me, but also partially by others such as @cryptoe, @LakshSingla, @adarshsanjeev, and @paul-rogers. I'm looking forward to seeing where we take this in the open source project 🙂