Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* An iterator over an array of JSON objects. Uses {@link ObjectCodec} to deserialize regular Java objects.
*
* @param <T> the type of object returned by this iterator
*/
public class JsonIterator<T> implements Iterator<T>, Closeable
public class JsonIterator<T> implements CloseableIterator<T>
{
private JsonParser jp;
private ObjectCodec objectCodec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ default <R> CloseableIterator<R> flatMap(Function<T, CloseableIterator<R>> funct

return new CloseableIterator<R>()
{
CloseableIterator<R> iterator = findNextIeteratorIfNecessary();
CloseableIterator<R> iterator = findNextIteratorIfNecessary();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


@Nullable
private CloseableIterator<R> findNextIeteratorIfNecessary()
private CloseableIterator<R> findNextIteratorIfNecessary()
{
while ((iterator == null || !iterator.hasNext()) && delegate.hasNext()) {
if (iterator != null) {
Expand Down Expand Up @@ -105,7 +105,7 @@ public R next()
return iterator.next();
}
finally {
findNextIeteratorIfNecessary();
findNextIteratorIfNecessary();
}
}

Expand Down
57 changes: 56 additions & 1 deletion docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ The detailed behavior of the Parallel task is different depending on the [`parti
See each `partitionsSpec` for more details.

To use this task, the [`inputSource`](#input-sources) in the `ioConfig` should be _splittable_ and `maxNumConcurrentSubTasks` should be set to larger than 1 in the `tuningConfig`.
Otherwise, this task runs sequentially; the `index_paralllel` task reads each input file one by one and creates segments by itself.
Otherwise, this task runs sequentially; the `index_parallel` task reads each input file one by one and creates segments by itself.
The supported splittable input formats for now are:

- [`s3`](#s3-input-source) reads data from AWS S3 storage.
Expand All @@ -63,6 +63,7 @@ The supported splittable input formats for now are:
- [`http`](#http-input-source) reads data from HTTP servers.
- [`local`](#local-input-source) reads data from local storage.
- [`druid`](#druid-input-source) reads data from a Druid datasource.
- [`sql`](#sql-input-source) reads data from a RDBMS source.

Some other cloud storage types are supported with the legacy [`firehose`](#firehoses-deprecated).
The below `firehose` types are also splittable. Note that only text formats are supported
Expand Down Expand Up @@ -1310,6 +1311,59 @@ A spec that applies a filter and reads a subset of the original datasource's col
This spec above will only return the `page`, `user` dimensions and `added` metric.
Only rows where `page` = `Druid` will be returned.

### SQL Input Source

The SQL input source is used to read data directly from RDBMS.
The SQL input source is _splittable_ and can be used by the [Parallel task](#parallel-task), where each worker task will read from one SQL query from the list of queries.
Since this input source has a fixed input format for reading events, no `inputFormat` field needs to be specified in the ingestion spec when using this input source.
Please refer to the Recommended practices section below before using this input source.

|property|description|required?|
|--------|-----------|---------|
|type|This should be "sql".|Yes|
|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support and this extension must be loaded into Druid. For database types `mysql` and `postgresql`, the `connectorConfig` support is provided by [mysql-metadata-storage](../development/extensions-core/mysql.md) and [postgresql-metadata-storage](../development/extensions-core/postgresql.md) extensions respectively.|Yes|
|foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|No|
|sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.|Yes|

An example SqlInputSource spec is shown below:

```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "sql",
"database": {
"type": "mysql",
"connectorConfig": {
"connectURI": "jdbc:mysql://host:port/schema",
"user": "user",
"password": "password"
}
},
"sqls": ["SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'"]
},
...
```

The spec above will read all events from two separate SQLs for the interval `2013-01-01/2013-01-02`.
Each of the SQL queries will be run in its own sub-task and thus for the above example, there would be two sub-tasks.

**Recommended practices**

Compared to the other native batch InputSources, SQL InputSource behaves differently in terms of reading the input data and so it would be helpful to consider the following points before using this InputSource in a production environment:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we mark this as a big warning in the docs? I think a similar warning should be made on line 1316 indicating that this functionality is experimental and not yet recommended for production use.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? This inputsource has been used in a couple of production type environments internally and as long as the indexer is allocated enough disk space by the cluster operator, it shouldnt run into any issues.
I've added some text asking the user to review these points before using the input source.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't say the SqlInputSource is an experimental. The experimental feature in Druid means either 1) the feature hasn't been stabilized yet and there could be potential bugs while using it or 2) the feature might be stable but can be removed in the future since we know there is a better way. In this case, I believe there will be a better way for ingesting from databases such as the SqlSupervisor mentioned here, but even in a better way, we can build the SqlSupervisor on top of the SqlInputSource (probably the supervisor can use it). Also, we have been providing the SqlFirehose as a non-experimental feature.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead of experimental, we should call this feature Beta. It indicates that there is future work coming and we may want to change the behavior or configuration of the InputSource, so users shouldn't rely on the API / behavior being consistent across releases.

Since there are no integration tests, I'm concerned about calling this GA, because someone would have to run through these tests manually before each release to make sure we did not accidentally break the SqlInputSource

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead of experimental, we should call this feature Beta. It indicates that there is future work coming and we may want to change the behavior or configuration of the InputSource, so users shouldn't rely on the API / behavior being consistent across releases.

I agree that more fine-grained feature lifecycle would be nice, but it should be discussed separately in the dev mailing list. For APIs/behaviors, all Druid InputSources are UnstableApi which may change in breaking ways at any time even between minor Druid release lines.

Since there are no integration tests, I'm concerned about calling this GA, because someone would have to run through these tests manually before each release to make sure we did not accidentally break the SqlInputSource

Since all inputSources are UnstableApis, I wouldn't call any of them GA. Integration tests sound nice, we should add them in the near future.


* During indexing, each sub-task would execute one of the SQL queries and the results are stored locally on disk. The sub-tasks then proceed to read the data from these local input files and generate segments. Presently, there isn’t any restriction on the size of the generated files and this would require the MiddleManagers or Indexers to have sufficient disk capacity based on the volume of data being indexed.

* Filtering the SQL queries based on the intervals specified in the `granularitySpec` can avoid unwanted data being retrieved and stored locally by the indexing sub-tasks. For example, if the `intervals` specified in the `granularitySpec` is `["2013-01-01/2013-01-02"]` and the SQL query is `SELECT * FROM table1`, `SqlInputSource` will read all the data for `table1` based on the query, even though only data between the intervals specified will be indexed into Druid.

* Pagination may be used on the SQL queries to ensure that each query pulls a similar amount of data, thereby improving the efficiency of the sub-tasks.

* Similar to file-based input formats, any updates to existing data will replace the data in segments specific to the intervals specified in the `granularitySpec`.


###

## Firehoses (Deprecated)

Firehoses are deprecated in 0.17.0. It's highly recommended to use the [Input source](#input-sources) instead.
Expand Down Expand Up @@ -1544,6 +1598,7 @@ This firehose will accept any type of parser, but will only utilize the list of
This Firehose can be used to ingest events residing in an RDBMS. The database connection information is provided as part of the ingestion spec.
For each query, the results are fetched locally and indexed.
If there are multiple queries from which data needs to be indexed, queries are prefetched in the background, up to `maxFetchCapacityBytes` bytes.
This Firehose is _splittable_ and can be used by [native parallel index tasks](native-batch.md#parallel-task).
This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification. See the extension documentation for more detailed ingestion examples.

Requires one of the following extensions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -173,8 +172,7 @@ public void insert(
}
}

@VisibleForTesting
protected static boolean isStatementException(Throwable e)
public static boolean isStatementException(Throwable e)
{
return e instanceof StatementException ||
(e instanceof CallbackFailedException && e.getCause() instanceof StatementException);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.metadata.input;
Comment thread
suneet-s marked this conversation as resolved.

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;

import java.util.List;

/**
* Module that installs {@link org.apache.druid.data.input.InputSource} implementations
*/
public class InputSourceModule implements DruidModule
Comment thread
suneet-s marked this conversation as resolved.
{
@Override
public List<? extends Module> getJacksonModules()
Comment thread
suneet-s marked this conversation as resolved.
{
return ImmutableList.<Module>of(
new SimpleModule("InputSourceModule")
.registerSubtypes(
new NamedType(SqlInputSource.class, "sql")
)
);
}

@Override
public void configure(Binder binder)
{
}
}
Loading