diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java index 2a241f66de7c..c03e5f6d8e84 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java @@ -27,11 +27,11 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.parsers.CloseableIterator; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.util.Iterator; import java.util.NoSuchElementException; /** @@ -39,7 +39,7 @@ * * @param the type of object returned by this iterator */ -public class JsonIterator implements Iterator, Closeable +public class JsonIterator implements CloseableIterator { private JsonParser jp; private ObjectCodec objectCodec; diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java index d5915369eada..af1baafe4198 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java @@ -66,10 +66,10 @@ default CloseableIterator flatMap(Function> funct return new CloseableIterator() { - CloseableIterator iterator = findNextIeteratorIfNecessary(); + CloseableIterator iterator = findNextIteratorIfNecessary(); @Nullable - private CloseableIterator findNextIeteratorIfNecessary() + private CloseableIterator findNextIteratorIfNecessary() { while ((iterator == null || !iterator.hasNext()) && delegate.hasNext()) { if (iterator != null) { @@ -105,7 +105,7 @@ public R next() return iterator.next(); } finally { - findNextIeteratorIfNecessary(); + findNextIteratorIfNecessary(); } } diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 02c8a321f984..981f9f700938 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -53,7 +53,7 @@ The detailed behavior of the Parallel task is different depending on the [`parti See each `partitionsSpec` for more details. To use this task, the [`inputSource`](#input-sources) in the `ioConfig` should be _splittable_ and `maxNumConcurrentSubTasks` should be set to larger than 1 in the `tuningConfig`. -Otherwise, this task runs sequentially; the `index_paralllel` task reads each input file one by one and creates segments by itself. +Otherwise, this task runs sequentially; the `index_parallel` task reads each input file one by one and creates segments by itself. The supported splittable input formats for now are: - [`s3`](#s3-input-source) reads data from AWS S3 storage. @@ -63,6 +63,7 @@ The supported splittable input formats for now are: - [`http`](#http-input-source) reads data from HTTP servers. - [`local`](#local-input-source) reads data from local storage. - [`druid`](#druid-input-source) reads data from a Druid datasource. +- [`sql`](#sql-input-source) reads data from a RDBMS source. Some other cloud storage types are supported with the legacy [`firehose`](#firehoses-deprecated). The below `firehose` types are also splittable. Note that only text formats are supported @@ -1310,6 +1311,59 @@ A spec that applies a filter and reads a subset of the original datasource's col This spec above will only return the `page`, `user` dimensions and `added` metric. Only rows where `page` = `Druid` will be returned. +### SQL Input Source + +The SQL input source is used to read data directly from RDBMS. +The SQL input source is _splittable_ and can be used by the [Parallel task](#parallel-task), where each worker task will read from one SQL query from the list of queries. +Since this input source has a fixed input format for reading events, no `inputFormat` field needs to be specified in the ingestion spec when using this input source. +Please refer to the Recommended practices section below before using this input source. + +|property|description|required?| +|--------|-----------|---------| +|type|This should be "sql".|Yes| +|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support and this extension must be loaded into Druid. For database types `mysql` and `postgresql`, the `connectorConfig` support is provided by [mysql-metadata-storage](../development/extensions-core/mysql.md) and [postgresql-metadata-storage](../development/extensions-core/postgresql.md) extensions respectively.|Yes| +|foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|No| +|sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.|Yes| + +An example SqlInputSource spec is shown below: + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "sql", + "database": { + "type": "mysql", + "connectorConfig": { + "connectURI": "jdbc:mysql://host:port/schema", + "user": "user", + "password": "password" + } + }, + "sqls": ["SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'"] + }, +... +``` + +The spec above will read all events from two separate SQLs for the interval `2013-01-01/2013-01-02`. +Each of the SQL queries will be run in its own sub-task and thus for the above example, there would be two sub-tasks. + +**Recommended practices** + +Compared to the other native batch InputSources, SQL InputSource behaves differently in terms of reading the input data and so it would be helpful to consider the following points before using this InputSource in a production environment: + +* During indexing, each sub-task would execute one of the SQL queries and the results are stored locally on disk. The sub-tasks then proceed to read the data from these local input files and generate segments. Presently, there isn’t any restriction on the size of the generated files and this would require the MiddleManagers or Indexers to have sufficient disk capacity based on the volume of data being indexed. + +* Filtering the SQL queries based on the intervals specified in the `granularitySpec` can avoid unwanted data being retrieved and stored locally by the indexing sub-tasks. For example, if the `intervals` specified in the `granularitySpec` is `["2013-01-01/2013-01-02"]` and the SQL query is `SELECT * FROM table1`, `SqlInputSource` will read all the data for `table1` based on the query, even though only data between the intervals specified will be indexed into Druid. + +* Pagination may be used on the SQL queries to ensure that each query pulls a similar amount of data, thereby improving the efficiency of the sub-tasks. + +* Similar to file-based input formats, any updates to existing data will replace the data in segments specific to the intervals specified in the `granularitySpec`. + + +### + ## Firehoses (Deprecated) Firehoses are deprecated in 0.17.0. It's highly recommended to use the [Input source](#input-sources) instead. @@ -1544,6 +1598,7 @@ This firehose will accept any type of parser, but will only utilize the list of This Firehose can be used to ingest events residing in an RDBMS. The database connection information is provided as part of the ingestion spec. For each query, the results are fetched locally and indexed. If there are multiple queries from which data needs to be indexed, queries are prefetched in the background, up to `maxFetchCapacityBytes` bytes. +This Firehose is _splittable_ and can be used by [native parallel index tasks](native-batch.md#parallel-task). This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification. See the extension documentation for more detailed ingestion examples. Requires one of the following extensions: diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index eb9e9916cac8..e6fdcfb45f1a 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Maps; @@ -173,8 +172,7 @@ public void insert( } } - @VisibleForTesting - protected static boolean isStatementException(Throwable e) + public static boolean isStatementException(Throwable e) { return e instanceof StatementException || (e instanceof CallbackFailedException && e.getCause() instanceof StatementException); diff --git a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java new file mode 100644 index 000000000000..0423af4f9a03 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; + +import java.util.List; + +/** + * Module that installs {@link org.apache.druid.data.input.InputSource} implementations + */ +public class InputSourceModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("InputSourceModule") + .registerSubtypes( + new NamedType(SqlInputSource.class, "sql") + ) + ); + } + + @Override + public void configure(Binder binder) + { + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java new file mode 100644 index 000000000000..724077a4c0d2 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.apache.druid.metadata.SQLMetadataStorageActionHandler; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.exceptions.ResultSetException; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +/** + * Represents a rdbms based input resource and knows how to read query results from the resource using SQL queries. + */ +public class SqlEntity implements InputEntity +{ + private static final Logger LOG = new Logger(SqlEntity.class); + + private final String sql; + private final ObjectMapper objectMapper; + private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; + private final boolean foldCase; + + public SqlEntity( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + boolean foldCase, + ObjectMapper objectMapper + ) + { + this.sql = sql; + this.sqlFirehoseDatabaseConnector = Preconditions.checkNotNull( + sqlFirehoseDatabaseConnector, + "SQL Metadata Connector not configured!" + ); + this.foldCase = foldCase; + this.objectMapper = objectMapper; + } + + public String getSql() + { + return sql; + } + + @Nullable + @Override + public URI getUri() + { + return null; + } + + @Override + public InputStream open() + { + throw new UnsupportedOperationException("Please use fetch() instead"); + } + + @Override + public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException + { + final File tempFile = File.createTempFile("druid-sql-entity", ".tmp", temporaryDirectory); + return openCleanableFile(sql, sqlFirehoseDatabaseConnector, objectMapper, foldCase, tempFile); + + } + + /** + * Executes a SQL query on the specified database and fetches the result into the given file. + * The result file is deleted if the query execution or the file write fails. + * + * @param sql The SQL query to be executed + * @param sqlFirehoseDatabaseConnector The database connector + * @param objectMapper An object mapper, used for deserialization + * @param foldCase A boolean flag used to enable or disabling case sensitivity while handling database column names + * + * @return A {@link InputEntity.CleanableFile} object that wraps the file containing the SQL results + */ + + public static CleanableFile openCleanableFile( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + ObjectMapper objectMapper, + boolean foldCase, + File tempFile + ) + throws IOException + { + try (FileOutputStream fos = new FileOutputStream(tempFile); + final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos);) { + + // Execute the sql query and lazily retrieve the results into the file in json format. + // foldCase is useful to handle differences in case sensitivity behavior across databases. + sqlFirehoseDatabaseConnector.retryWithHandle( + (handle) -> { + ResultIterator> resultIterator = handle.createQuery( + sql + ).map( + (index, r, ctx) -> { + Map resultRow = foldCase ? new CaseFoldedMap() : new HashMap<>(); + ResultSetMetaData resultMetadata; + try { + resultMetadata = r.getMetaData(); + } + catch (SQLException e) { + throw new ResultSetException("Unable to obtain metadata from result set", e, ctx); + } + try { + for (int i = 1; i <= resultMetadata.getColumnCount(); i++) { + String key = resultMetadata.getColumnName(i); + String alias = resultMetadata.getColumnLabel(i); + Object value = r.getObject(i); + resultRow.put(alias != null ? alias : key, value); + } + } + catch (SQLException e) { + throw new ResultSetException("Unable to access specific metadata from " + + "result set metadata", e, ctx); + } + return resultRow; + } + ).iterator(); + jg.writeStartArray(); + while (resultIterator.hasNext()) { + jg.writeObject(resultIterator.next()); + } + jg.writeEndArray(); + jg.close(); + return null; + }, + (exception) -> sqlFirehoseDatabaseConnector.isTransientException(exception) + && !(SQLMetadataStorageActionHandler.isStatementException(exception)) + ); + return new CleanableFile() + { + @Override + public File file() + { + return tempFile; + } + + @Override + public void close() + { + if (!tempFile.delete()) { + LOG.warn("Failed to remove file[%s]", tempFile.getAbsolutePath()); + } + } + }; + } + catch (Exception e) { + if (!tempFile.delete()) { + LOG.warn("Failed to remove file[%s]", tempFile.getAbsolutePath()); + } + throw new IOException(e); + } + } + + private static class CaseFoldedMap extends HashMap + { + public static final long serialVersionUID = 1L; + + @Override + public Object get(Object obj) + { + return super.get(obj == null ? null : StringUtils.toLowerCase((String) obj)); + } + + @Override + public Object put(String key, Object value) + { + return super.put(key == null ? null : StringUtils.toLowerCase(key), value); + } + + @Override + public boolean containsKey(Object obj) + { + return super.containsKey(obj == null ? null : StringUtils.toLowerCase((String) obj)); + } + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlInputFormat.java b/server/src/main/java/org/apache/druid/metadata/input/SqlInputFormat.java new file mode 100644 index 000000000000..6d0aa59a20d2 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlInputFormat.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; + +import java.io.File; + +public class SqlInputFormat implements InputFormat +{ + private final ObjectMapper objectMapper; + + public SqlInputFormat(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + @Override + public boolean isSplittable() + { + return true; + } + + @Override + public InputEntityReader createReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory + ) + { + return new SqlReader(inputRowSchema, source, temporaryDirectory, objectMapper); + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java new file mode 100644 index 000000000000..c7dfbb7fa365 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + +public class SqlInputSource extends AbstractInputSource implements SplittableInputSource +{ + private final List sqls; + private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; + private final ObjectMapper objectMapper; + private final boolean foldCase; + + @JsonCreator + public SqlInputSource( + @JsonProperty("sqls") List sqls, + @JsonProperty("foldCase") boolean foldCase, + @JsonProperty("database") SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + @JacksonInject @Smile ObjectMapper objectMapper + ) + { + Preconditions.checkArgument(sqls.size() > 0, "No SQL queries provided"); + + this.sqls = sqls; + this.foldCase = foldCase; + this.sqlFirehoseDatabaseConnector = Preconditions.checkNotNull( + sqlFirehoseDatabaseConnector, + "SQL Metadata Connector not configured!" + ); + this.objectMapper = objectMapper; + } + + @JsonProperty + public List getSqls() + { + return sqls; + } + + @JsonProperty + public boolean isFoldCase() + { + return foldCase; + } + + @JsonProperty("database") + public SQLFirehoseDatabaseConnector getSQLFirehoseDatabaseConnector() + { + return sqlFirehoseDatabaseConnector; + } + + @Override + public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return sqls.stream().map(InputSplit::new); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return sqls.size(); + } + + @Override + public SplittableInputSource withSplit(InputSplit split) + { + return new SqlInputSource( + Collections.singletonList(split.get()), + foldCase, + sqlFirehoseDatabaseConnector, + objectMapper + ); + } + + @Override + protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) + { + final SqlInputFormat inputFormat = new SqlInputFormat(objectMapper); + return new InputEntityIteratingReader( + inputRowSchema, + inputFormat, + createSplits(inputFormat, null) + .map(split -> new SqlEntity(split.get(), sqlFirehoseDatabaseConnector, foldCase, objectMapper)).iterator(), + temporaryDirectory + ); + } + + @Override + public boolean needsFormat() + { + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SqlInputSource that = (SqlInputSource) o; + return foldCase == that.foldCase && + sqls.equals(that.sqls) && + sqlFirehoseDatabaseConnector.equals(that.sqlFirehoseDatabaseConnector); + } + + @Override + public int hashCode() + { + return Objects.hash(sqls, sqlFirehoseDatabaseConnector, foldCase); + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java new file mode 100644 index 000000000000..4657158c0463 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.impl.prefetch.JsonIterator; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Reader exclusively for {@link SqlEntity} + */ +public class SqlReader extends IntermediateRowParsingReader> +{ + private final InputRowSchema inputRowSchema; + private final SqlEntity source; + private final File temporaryDirectory; + private final ObjectMapper objectMapper; + + + SqlReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory, + ObjectMapper objectMapper + ) + { + this.inputRowSchema = inputRowSchema; + this.source = (SqlEntity) source; + this.temporaryDirectory = temporaryDirectory; + this.objectMapper = objectMapper; + } + + @Override + protected CloseableIterator> intermediateRowIterator() throws IOException + { + final Closer closer = Closer.create(); + //The results are fetched into local storage as this avoids having to keep a persistent database connection for a long time + final InputEntity.CleanableFile resultFile = closer.register(source.fetch(temporaryDirectory, null)); + FileInputStream inputStream = new FileInputStream(resultFile.file()); + JsonIterator> jsonIterator = new JsonIterator<>(new TypeReference>() + { + }, inputStream, closer, objectMapper); + return jsonIterator; + } + + @Override + protected List parseInputRows(Map intermediateRow) throws ParseException + { + return Collections.singletonList( + MapInputRowParser.parse( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + intermediateRow + ) + ); + } + + @Override + protected Map toMap(Map intermediateRow) + { + return intermediateRow; + } +} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java index 9daa231f9b7e..0b5863d671d6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java @@ -22,32 +22,23 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.guice.annotations.Smile; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; -import org.skife.jdbi.v2.ResultIterator; -import org.skife.jdbi.v2.exceptions.CallbackFailedException; -import org.skife.jdbi.v2.exceptions.ResultSetException; -import org.skife.jdbi.v2.exceptions.StatementException; +import org.apache.druid.metadata.input.SqlEntity; import javax.annotation.Nullable; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -86,7 +77,10 @@ public SqlFirehoseFactory( this.sqls = sqls; this.objectMapper = objectMapper; - this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.sqlFirehoseDatabaseConnector = Preconditions.checkNotNull( + sqlFirehoseDatabaseConnector, + "SQL Metadata Connector not configured!" + ); this.foldCase = foldCase; this.connectorConfig = null; } @@ -94,79 +88,8 @@ public SqlFirehoseFactory( @Override protected InputStream openObjectStream(String sql, File fileName) throws IOException { - Preconditions.checkNotNull(sqlFirehoseDatabaseConnector, "SQL Metadata Connector not configured!"); - try (FileOutputStream fos = new FileOutputStream(fileName)) { - final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos); - sqlFirehoseDatabaseConnector.retryWithHandle( - (handle) -> { - ResultIterator> resultIterator = handle.createQuery( - sql - ).map( - (index, r, ctx) -> { - Map resultRow = foldCase ? new CaseFoldedMap() : new HashMap<>(); - ResultSetMetaData resultMetadata; - try { - resultMetadata = r.getMetaData(); - } - catch (SQLException e) { - throw new ResultSetException("Unable to obtain metadata from result set", e, ctx); - } - try { - for (int i = 1; i <= resultMetadata.getColumnCount(); i++) { - String key = resultMetadata.getColumnName(i); - String alias = resultMetadata.getColumnLabel(i); - Object value = r.getObject(i); - resultRow.put(alias != null ? alias : key, value); - } - } - catch (SQLException e) { - throw new ResultSetException("Unable to access specific metadata from " + - "result set metadata", e, ctx); - } - return resultRow; - } - ).iterator(); - jg.writeStartArray(); - while (resultIterator.hasNext()) { - jg.writeObject(resultIterator.next()); - } - jg.writeEndArray(); - jg.close(); - return null; - }, - (exception) -> { - final boolean isStatementException = exception instanceof StatementException || - (exception instanceof CallbackFailedException - && exception.getCause() instanceof StatementException); - return sqlFirehoseDatabaseConnector.isTransientException(exception) && !(isStatementException); - } - ); - } + SqlEntity.openCleanableFile(sql, sqlFirehoseDatabaseConnector, objectMapper, foldCase, fileName); return new FileInputStream(fileName); - - } - - private static class CaseFoldedMap extends HashMap - { - public static final long serialVersionUID = 1L; - - @Override - public Object get(Object obj) - { - return super.get(StringUtils.toLowerCase((String) obj)); - } - - @Override - public Object put(String key, Object value) - { - return super.put(StringUtils.toLowerCase(key), value); - } - - @Override - public boolean containsKey(Object obj) - { - return super.containsKey(StringUtils.toLowerCase((String) obj)); - } } @Override diff --git a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java new file mode 100644 index 000000000000..67126b0c7b2f --- /dev/null +++ b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.cfg.MapperConfig; +import com.fasterxml.jackson.databind.introspect.AnnotatedClass; +import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.stream.Collectors; + +public class InputSourceModuleTest +{ + private final ObjectMapper mapper = new ObjectMapper(); + private final String SQL_NAMED_TYPE = "sql"; + + @Before + public void setUp() + { + for (Module jacksonModule : new InputSourceModule().getJacksonModules()) { + mapper.registerModule(jacksonModule); + } + } + + @Test + public void testSubTypeRegistration() + { + MapperConfig config = mapper.getDeserializationConfig(); + AnnotatedClass annotatedClass = AnnotatedClassResolver.resolveWithoutSuperTypes(config, SqlInputSource.class); + List subtypes = mapper.getSubtypeResolver() + .collectAndResolveSubtypesByClass(config, annotatedClass) + .stream() + .map(NamedType::getName) + .collect(Collectors.toList()); + Assert.assertNotNull(subtypes); + Assert.assertEquals(SQL_NAMED_TYPE, Iterables.getOnlyElement(subtypes)); + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java new file mode 100644 index 000000000000..46a171b50c92 --- /dev/null +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +public class SqlEntityTest +{ + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + + private final ObjectMapper mapper = TestHelper.makeSmileMapper(); + private TestDerbyConnector derbyConnector; + String TABLE_NAME_1 = "FOOS_TABLE"; + + String VALID_SQL = "SELECT timestamp,a,b FROM FOOS_TABLE"; + String INVALID_SQL = "DONT SELECT timestamp,a,b FROM FOOS_TABLE"; + String resultJson = "[{\"a\":\"0\"," + + "\"b\":\"0\"," + + "\"timestamp\":\"2011-01-12T00:00:00.000Z\"" + + "}]"; + + @Before + public void setUp() + { + for (Module jacksonModule : new InputSourceModule().getJacksonModules()) { + mapper.registerModule(jacksonModule); + } + } + + @Test + public void testExecuteQuery() throws IOException + { + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 1); + File tmpFile = File.createTempFile( + "testQueryResults", + "" + ); + InputEntity.CleanableFile queryResult = SqlEntity.openCleanableFile( + VALID_SQL, + testUtils.getDerbyFirehoseConnector(), + mapper, + true, + tmpFile + ); + InputStream queryInputStream = new FileInputStream(queryResult.file()); + String actualJson = IOUtils.toString(queryInputStream, StandardCharsets.UTF_8); + + Assert.assertEquals(actualJson, resultJson); + testUtils.dropTable(TABLE_NAME_1); + } + + @Test(expected = IOException.class) + public void testFailOnInvalidQuery() throws IOException + { + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 1); + File tmpFile = File.createTempFile( + "testQueryResults", + "" + ); + InputEntity.CleanableFile queryResult = SqlEntity.openCleanableFile( + INVALID_SQL, + testUtils.getDerbyFirehoseConnector(), + mapper, + true, + tmpFile + ); + + Assert.assertTrue(tmpFile.exists()); + } + + @Test + public void testFileDeleteOnInvalidQuery() throws IOException + { + //The test parameters here are same as those used for testFailOnInvalidQuery(). + //The only difference is that this test checks if the temporary file is deleted upon failure. + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 1); + File tmpFile = File.createTempFile( + "testQueryResults", + "" + ); + try { + SqlEntity.openCleanableFile( + INVALID_SQL, + testUtils.getDerbyFirehoseConnector(), + mapper, + true, + tmpFile + ); + } + // Lets catch the exception so as to test temporary file deletion. + catch (IOException e) { + Assert.assertFalse(tmpFile.exists()); + } + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java new file mode 100644 index 000000000000..7afa88894f26 --- /dev/null +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.io.FileUtils; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.Row; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.metadata.MetadataStorageConnectorConfig; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.segment.TestHelper; +import org.easymock.EasyMock; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.skife.jdbi.v2.DBI; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class SqlInputSourceTest +{ + private static final List FIREHOSE_TMP_DIRS = new ArrayList<>(); + private final String TABLE_NAME_1 = "FOOS_TABLE_1"; + private final String TABLE_NAME_2 = "FOOS_TABLE_2"; + + private final List SQLLIST1 = ImmutableList.of("SELECT timestamp,a,b FROM FOOS_TABLE_1"); + private final List SQLLIST2 = ImmutableList.of( + "SELECT timestamp,a,b FROM FOOS_TABLE_1", + "SELECT timestamp,a,b FROM FOOS_TABLE_2" + ); + + private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")), + new ArrayList<>(), + new ArrayList<>() + ), + Collections.emptyList() + ); + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + private final ObjectMapper mapper = TestHelper.makeSmileMapper(); + private TestDerbyConnector derbyConnector; + + @Before + public void setUp() + { + for (Module jacksonModule : new InputSourceModule().getJacksonModules()) { + mapper.registerModule(jacksonModule); + } + } + + @AfterClass + public static void teardown() throws IOException + { + for (File dir : FIREHOSE_TMP_DIRS) { + FileUtils.forceDelete(dir); + } + } + + private void assertResult(List rows, List sqls) + { + Assert.assertEquals(10 * sqls.size(), rows.size()); + rows.sort(Comparator.comparing(Row::getTimestamp) + .thenComparingInt(r -> Integer.valueOf(r.getDimension("a").get(0))) + .thenComparingInt(r -> Integer.valueOf(r.getDimension("b").get(0)))); + int rowCount = 0; + for (int i = 0; i < 10; i++) { + for (int j = 0; j < sqls.size(); j++) { + final Row row = rows.get(rowCount); + String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i); + Assert.assertEquals(timestampSt, row.getTimestamp().toString()); + Assert.assertEquals(i, Integer.valueOf(row.getDimension("a").get(0)).intValue()); + Assert.assertEquals(i, Integer.valueOf(row.getDimension("b").get(0)).intValue()); + rowCount++; + } + } + } + + private File createFirehoseTmpDir(String dirSuffix) throws IOException + { + final File firehoseTempDir = File.createTempFile( + SqlInputSourceTest.class.getSimpleName(), + dirSuffix + ); + FileUtils.forceDelete(firehoseTempDir); + FileUtils.forceMkdir(firehoseTempDir); + FIREHOSE_TMP_DIRS.add(firehoseTempDir); + return firehoseTempDir; + } + + @Test + public void testSerde() throws IOException + { + mapper.registerSubtypes(TestSerdeFirehoseConnector.class); + final SqlInputSourceTest.TestSerdeFirehoseConnector testSerdeFirehoseConnector = new SqlInputSourceTest.TestSerdeFirehoseConnector( + new MetadataStorageConnectorConfig()); + final SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testSerdeFirehoseConnector, mapper); + final String valueString = mapper.writeValueAsString(sqlInputSource); + final SqlInputSource inputSourceFromJson = mapper.readValue(valueString, SqlInputSource.class); + Assert.assertEquals(sqlInputSource, inputSourceFromJson); + } + + @Test + public void testSingleSplit() throws Exception + { + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 10); + final File tempDir = createFirehoseTmpDir("testSingleSplit"); + SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testUtils.getDerbyFirehoseConnector(), mapper); + InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir); + CloseableIterator resultIterator = sqlReader.read(); + final List rows = new ArrayList<>(); + while (resultIterator.hasNext()) { + rows.add(resultIterator.next()); + } + assertResult(rows, SQLLIST1); + testUtils.dropTable(TABLE_NAME_1); + } + + + @Test + public void testMultipleSplits() throws Exception + { + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 10); + testUtils.createAndUpdateTable(TABLE_NAME_2, 10); + final File tempDir = createFirehoseTmpDir("testMultipleSplit"); + SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, testUtils.getDerbyFirehoseConnector(), mapper); + InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir); + CloseableIterator resultIterator = sqlReader.read(); + final List rows = new ArrayList<>(); + while (resultIterator.hasNext()) { + rows.add(resultIterator.next()); + } + assertResult(rows, SQLLIST2); + testUtils.dropTable(TABLE_NAME_1); + testUtils.dropTable(TABLE_NAME_2); + } + + @Test + public void testNumSplits() + { + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, testUtils.getDerbyFirehoseConnector(), mapper); + InputFormat inputFormat = EasyMock.createMock(InputFormat.class); + Stream> sqlSplits = sqlInputSource.createSplits(inputFormat, null); + Assert.assertEquals(SQLLIST2, sqlSplits.map(InputSplit::get).collect(Collectors.toList())); + Assert.assertEquals(2, sqlInputSource.estimateNumSplits(inputFormat, null)); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(SqlInputSource.class) + .withPrefabValues( + ObjectMapper.class, + new ObjectMapper(), + new ObjectMapper() + ) + .withIgnoredFields("objectMapper") + .withNonnullFields("sqls", "sqlFirehoseDatabaseConnector") + .usingGetClass() + .verify(); + } + + @JsonTypeName("test") + private static class TestSerdeFirehoseConnector extends SQLFirehoseDatabaseConnector + { + private final DBI dbi; + private final MetadataStorageConnectorConfig metadataStorageConnectorConfig; + + private TestSerdeFirehoseConnector( + @JsonProperty("connectorConfig") MetadataStorageConnectorConfig metadataStorageConnectorConfig + ) + { + final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig); + datasource.setDriverClassLoader(getClass().getClassLoader()); + datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver"); + this.dbi = new DBI(datasource); + this.metadataStorageConnectorConfig = metadataStorageConnectorConfig; + } + + @JsonProperty("connectorConfig") + public MetadataStorageConnectorConfig getConnectorConfig() + { + return metadataStorageConnectorConfig; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestSerdeFirehoseConnector that = (TestSerdeFirehoseConnector) o; + return metadataStorageConnectorConfig.equals(that.metadataStorageConnectorConfig); + } + + @Override + public int hashCode() + { + return Objects.hash(metadataStorageConnectorConfig); + } + + @Override + public DBI getDBI() + { + return dbi; + } + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java b/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java new file mode 100644 index 000000000000..60e7c73e4397 --- /dev/null +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.MetadataStorageConnectorConfig; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.apache.druid.metadata.TestDerbyConnector; +import org.junit.Rule; +import org.skife.jdbi.v2.Batch; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.tweak.HandleCallback; + +public class SqlTestUtils +{ + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + private final TestDerbyFirehoseConnector derbyFirehoseConnector; + private final TestDerbyConnector derbyConnector; + + public SqlTestUtils(TestDerbyConnector derbyConnector) + { + this.derbyConnector = derbyConnector; + this.derbyFirehoseConnector = new SqlTestUtils.TestDerbyFirehoseConnector( + new MetadataStorageConnectorConfig(), + derbyConnector.getDBI() + ); + } + + private static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector + { + private final DBI dbi; + + private TestDerbyFirehoseConnector( + @JsonProperty("connectorConfig") MetadataStorageConnectorConfig metadataStorageConnectorConfig, DBI dbi + ) + { + final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig); + datasource.setDriverClassLoader(getClass().getClassLoader()); + datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver"); + this.dbi = dbi; + } + + @Override + public DBI getDBI() + { + return dbi; + } + } + + public void createAndUpdateTable(final String tableName, int numEntries) + { + derbyConnector.createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %1$s (\n" + + " timestamp varchar(255) NOT NULL,\n" + + " a VARCHAR(255) NOT NULL,\n" + + " b VARCHAR(255) NOT NULL\n" + + ")", + tableName + ) + ) + ); + + derbyConnector.getDBI().withHandle( + (handle) -> { + Batch batch = handle.createBatch(); + for (int i = 0; i < numEntries; i++) { + String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i); + batch.add(StringUtils.format("INSERT INTO %1$s (timestamp, a, b) VALUES ('%2$s', '%3$s', '%4$s')", + tableName, timestampSt, + i, i + )); + } + batch.execute(); + return null; + } + ); + } + + public void dropTable(final String tableName) + { + derbyConnector.getDBI().withHandle( + (HandleCallback) handle -> { + handle.createStatement(StringUtils.format("DROP TABLE %s", tableName)) + .execute(); + return null; + } + ); + } + + public TestDerbyFirehoseConnector getDerbyFirehoseConnector() + { + return derbyFirehoseConnector; + } +} diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java index 34fa763613fe..189aa4984aac 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java @@ -34,6 +34,7 @@ import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.metadata.input.SqlTestUtils; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.transform.TransformSpec; import org.junit.AfterClass; @@ -41,9 +42,7 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.tweak.HandleCallback; import java.io.File; import java.io.IOException; @@ -82,7 +81,6 @@ public class SqlFirehoseFactoryTest ) ); private TestDerbyConnector derbyConnector; - private TestDerbyFirehoseConnector derbyFirehoseConnector; @BeforeClass public static void setup() throws IOException @@ -139,56 +137,12 @@ private File createFirehoseTmpDir(String dirSuffix) throws IOException return firehoseTempDir; } - private void dropTable(final String tableName) - { - derbyConnector.getDBI().withHandle( - (HandleCallback) handle -> { - handle.createStatement(StringUtils.format("DROP TABLE %s", tableName)) - .execute(); - return null; - } - ); - } - - private void createAndUpdateTable(final String tableName) - { - derbyConnector = derbyConnectorRule.getConnector(); - derbyFirehoseConnector = new TestDerbyFirehoseConnector(new MetadataStorageConnectorConfig(), - derbyConnector.getDBI()); - derbyConnector.createTable( - tableName, - ImmutableList.of( - StringUtils.format( - "CREATE TABLE %1$s (\n" - + " timestamp varchar(255) NOT NULL,\n" - + " a VARCHAR(255) NOT NULL,\n" - + " b VARCHAR(255) NOT NULL\n" - + ")", - tableName - ) - ) - ); - - derbyConnector.getDBI().withHandle( - (handle) -> { - Batch batch = handle.createBatch(); - for (int i = 0; i < 10; i++) { - String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i); - batch.add(StringUtils.format("INSERT INTO %1$s (timestamp, a, b) VALUES ('%2$s', '%3$s', '%4$s')", - tableName, timestampSt, - i, i - )); - } - batch.execute(); - return null; - } - ); - } - @Test public void testWithoutCacheAndFetch() throws Exception { - createAndUpdateTable(TABLE_NAME_1); + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 10); final SqlFirehoseFactory factory = new SqlFirehoseFactory( SQLLIST1, @@ -197,7 +151,7 @@ public void testWithoutCacheAndFetch() throws Exception 0L, 0L, true, - derbyFirehoseConnector, + testUtils.getDerbyFirehoseConnector(), mapper ); @@ -211,14 +165,16 @@ public void testWithoutCacheAndFetch() throws Exception assertResult(rows, SQLLIST1); assertNumRemainingCacheFiles(firehoseTmpDir, 0); - dropTable(TABLE_NAME_1); + testUtils.dropTable(TABLE_NAME_1); } @Test public void testWithoutCache() throws IOException { - createAndUpdateTable(TABLE_NAME_1); + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 10); final SqlFirehoseFactory factory = new SqlFirehoseFactory( SQLLIST1, @@ -227,7 +183,7 @@ public void testWithoutCache() throws IOException null, null, true, - derbyFirehoseConnector, + testUtils.getDerbyFirehoseConnector(), mapper ); @@ -242,15 +198,17 @@ public void testWithoutCache() throws IOException assertResult(rows, SQLLIST1); assertNumRemainingCacheFiles(firehoseTmpDir, 0); - dropTable(TABLE_NAME_1); + testUtils.dropTable(TABLE_NAME_1); } @Test public void testWithCacheAndFetch() throws IOException { - createAndUpdateTable(TABLE_NAME_1); - createAndUpdateTable(TABLE_NAME_2); + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 10); + testUtils.createAndUpdateTable(TABLE_NAME_2, 10); final SqlFirehoseFactory factory = new SqlFirehoseFactory( @@ -260,7 +218,7 @@ public void testWithCacheAndFetch() throws IOException 0L, null, true, - derbyFirehoseConnector, + testUtils.getDerbyFirehoseConnector(), mapper ); @@ -274,8 +232,8 @@ public void testWithCacheAndFetch() throws IOException assertResult(rows, SQLLIST2); assertNumRemainingCacheFiles(firehoseTmpDir, 2); - dropTable(TABLE_NAME_1); - dropTable(TABLE_NAME_2); + testUtils.dropTable(TABLE_NAME_1); + testUtils.dropTable(TABLE_NAME_2); } private static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 7483fec93a93..f6342944a1e7 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -58,6 +58,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.http.ShuffleResource; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.lookup.LookupModule; import org.apache.druid.segment.realtime.CliIndexerDataSegmentServerAnnouncerLifecycleHandler; @@ -200,6 +201,7 @@ public DataNodeService getDataNodeService() new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), + new InputSourceModule(), new QueryablePeonModule(), new CliIndexerServerModule(properties), new LookupModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index d273c16ad331..8b3d12f286eb 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -61,6 +61,7 @@ import org.apache.druid.indexing.worker.http.TaskManagementResource; import org.apache.druid.indexing.worker.http.WorkerResource; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.query.lookup.LookupSerdeModule; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager; @@ -186,6 +187,7 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), + new InputSourceModule(), new LookupSerdeModule() ); } diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 21ffa9a8bff2..4baf155d229c 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -98,6 +98,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorResource; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.query.lookup.LookupSerdeModule; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager; @@ -347,6 +348,7 @@ private void configureOverlordHelpers(Binder binder) new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), + new InputSourceModule(), new SupervisorModule(), new LookupSerdeModule(), new SamplerModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 1160eb9240c1..9e499c3a444f 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -92,6 +92,7 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.lookup.LookupModule; import org.apache.druid.segment.loading.DataSegmentArchiver; @@ -262,6 +263,7 @@ public SegmentListerResource getSegmentListerResource( new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), new IndexingServiceTuningConfigModule(), + new InputSourceModule(), new ChatHandlerServerModule(properties), new LookupModule() ); diff --git a/website/.spelling b/website/.spelling index 7452d1403f92..faeceef243b1 100644 --- a/website/.spelling +++ b/website/.spelling @@ -98,6 +98,7 @@ IndexTask InfluxDB InputFormat InputSource +InputSources Integer.MAX_VALUE JBOD JDBC @@ -151,6 +152,7 @@ S3 SDK SIGAR SPNEGO +SqlInputSource SQLServer SSD SSDs @@ -1752,4 +1754,4 @@ UserGroupInformation CVE-2019-17571 CVE-2019-12399 CVE-2018-17196 -bin.tar.gz \ No newline at end of file +bin.tar.gz