From 64b24758bb91ff2174e80e24cd068b3987cc2ca3 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Mon, 2 Mar 2020 15:26:06 -0600 Subject: [PATCH 01/11] Add Sql InputSource --- .../common/parsers/CloseableIterator.java | 6 +- docs/ingestion/native-batch.md | 39 +++ .../apache/druid/guice/FirehoseModule.java | 4 +- .../druid/metadata/input/SqlEntity.java | 190 +++++++++++ .../druid/metadata/input/SqlInputFormat.java | 54 +++ .../druid/metadata/input/SqlInputSource.java | 149 +++++++++ .../druid/metadata/input/SqlReader.java | 110 ++++++ .../realtime/firehose/SqlFirehoseFactory.java | 84 +---- .../metadata/input/SqlInputSourceTest.java | 313 ++++++++++++++++++ 9 files changed, 863 insertions(+), 86 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java create mode 100644 server/src/main/java/org/apache/druid/metadata/input/SqlInputFormat.java create mode 100644 server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java create mode 100644 server/src/main/java/org/apache/druid/metadata/input/SqlReader.java create mode 100644 server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java index 45cda5cbc452..aca7211d40d1 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java @@ -66,10 +66,10 @@ default CloseableIterator flatMap(Function> funct return new CloseableIterator() { - CloseableIterator iterator = findNextIeteratorIfNecessary(); + CloseableIterator iterator = findNextIteratorIfNecessary(); @Nullable - private CloseableIterator findNextIeteratorIfNecessary() + private CloseableIterator findNextIteratorIfNecessary() { while ((iterator == null || !iterator.hasNext()) && delegate.hasNext()) { if (iterator != null) { @@ -104,7 +104,7 @@ public R next() return iterator.next(); } finally { - findNextIeteratorIfNecessary(); + findNextIteratorIfNecessary(); } } diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 39e838a95605..d0581896e27b 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -63,6 +63,7 @@ The supported splittable input formats for now are: - [`http`](#http-input-source) reads data from HTTP servers. - [`local`](#local-input-source) reads data from local storage. - [`druid`](#druid-input-source) reads data from a Druid datasource. +- [`sql`](#sql-input-source) reads data from a RDBMS source. Some other cloud storage types are supported with the legacy [`firehose`](#firehoses-deprecated). The below `firehose` types are also splittable. Note that only text formats are supported @@ -1304,6 +1305,43 @@ A spec that applies a filter and reads a subset of the original datasource's col This spec above will only return the `page`, `user` dimensions and `added` metric. Only rows where `page` = `Druid` will be returned. +### Sql Input Source + +The SQL input source is used to read data directly from RDBMS. +The SQL input source is _splittable_ and can be used by the [Parallel task](#parallel-task), where each worker task will read from one SQL query from the list of queries. +Since this input source has a fixed input format for reading events, no `inputFormat` field needs to be specified in the ingestion spec when using this input source. + +|property|description|required?| +|--------|-----------|---------| +|type|This should be "sql".|Yes| +|database|Specifies the database connection details.|Yes| +|foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|No| +|sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.|Yes| + +An example SqlInputSource spec is shown below: + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "sql", + "database": { + "type": "mysql", + "connectorConfig": { + "connectURI": "jdbc:mysql://host:port/schema", + "user": "user", + "password": "password" + } + }, + "sqls": ["SELECT * FROM table1", "SELECT * FROM table2"] + }, +... +``` + +The spec above will read all events from two separate sqls +within the interval `2013-01-01/2013-01-02`. + ## Firehoses (Deprecated) Firehoses are deprecated in 0.17.0. It's highly recommended to use the [Input source](#input-sources) instead. @@ -1538,6 +1576,7 @@ This firehose will accept any type of parser, but will only utilize the list of This Firehose can be used to ingest events residing in an RDBMS. The database connection information is provided as part of the ingestion spec. For each query, the results are fetched locally and indexed. If there are multiple queries from which data needs to be indexed, queries are prefetched in the background, up to `maxFetchCapacityBytes` bytes. +This Firehose is _splittable_ and can be used by [native parallel index tasks](native-batch.md#parallel-task). This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification. See the extension documentation for more detailed ingestion examples. Requires one of the following extensions: diff --git a/server/src/main/java/org/apache/druid/guice/FirehoseModule.java b/server/src/main/java/org/apache/druid/guice/FirehoseModule.java index c95b0cd42eec..e5f2a68639f8 100644 --- a/server/src/main/java/org/apache/druid/guice/FirehoseModule.java +++ b/server/src/main/java/org/apache/druid/guice/FirehoseModule.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.metadata.input.SqlInputSource; import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory; import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory; import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; @@ -58,7 +59,8 @@ public List getJacksonModules() new NamedType(CombiningFirehoseFactory.class, "combining"), new NamedType(FixedCountFirehoseFactory.class, "fixedCount"), new NamedType(SqlFirehoseFactory.class, "sql"), - new NamedType(InlineFirehoseFactory.class, "inline") + new NamedType(InlineFirehoseFactory.class, "inline"), + new NamedType(SqlInputSource.class, "sql") ) ); } diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java new file mode 100644 index 000000000000..f00c3e40ca81 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.ResultSetException; +import org.skife.jdbi.v2.exceptions.StatementException; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class SqlEntity implements InputEntity +{ + private static final Logger LOG = new Logger(SqlEntity.class); + + private final String sql; + private final ObjectMapper objectMapper; + private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; + private final boolean foldCase; + + public SqlEntity( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + boolean foldCase, + ObjectMapper objectMapper + ) + { + this.sql = sql; + this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.foldCase = foldCase; + this.objectMapper = objectMapper; + } + + public String getSql() + { + return sql; + } + + @Nullable + @Override + public URI getUri() + { + return null; + } + + @Override + public InputStream open() + { + throw new UnsupportedOperationException("Please use fetch() instead"); + } + + @Override + public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException + { + final File tempFile = File.createTempFile("druid-sql-entity", ".tmp", temporaryDirectory); + return openCleanableFile(sql, sqlFirehoseDatabaseConnector, objectMapper, foldCase, tempFile); + + } + + public static CleanableFile openCleanableFile( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + ObjectMapper objectMapper, + boolean foldCase, + File tempFile + ) + throws IOException + { + Preconditions.checkNotNull(sqlFirehoseDatabaseConnector, "SQL Metadata Connector not configured!"); + try (FileOutputStream fos = new FileOutputStream(tempFile)) { + final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos); + sqlFirehoseDatabaseConnector.retryWithHandle( + (handle) -> { + ResultIterator> resultIterator = handle.createQuery( + sql + ).map( + (index, r, ctx) -> { + Map resultRow = foldCase ? new CaseFoldedMap() : new HashMap<>(); + ResultSetMetaData resultMetadata; + try { + resultMetadata = r.getMetaData(); + } + catch (SQLException e) { + throw new ResultSetException("Unable to obtain metadata from result set", e, ctx); + } + try { + for (int i = 1; i <= resultMetadata.getColumnCount(); i++) { + String key = resultMetadata.getColumnName(i); + String alias = resultMetadata.getColumnLabel(i); + Object value = r.getObject(i); + resultRow.put(alias != null ? alias : key, value); + } + } + catch (SQLException e) { + throw new ResultSetException("Unable to access specific metadata from " + + "result set metadata", e, ctx); + } + return resultRow; + } + ).iterator(); + jg.writeStartArray(); + while (resultIterator.hasNext()) { + jg.writeObject(resultIterator.next()); + } + jg.writeEndArray(); + jg.close(); + return null; + }, + (exception) -> { + final boolean isStatementException = exception instanceof StatementException || + (exception instanceof CallbackFailedException + && exception.getCause() instanceof StatementException); + return sqlFirehoseDatabaseConnector.isTransientException(exception) && !(isStatementException); + } + ); + } + return new CleanableFile() + { + @Override + public File file() + { + return tempFile; + } + + @Override + public void close() + { + if (!tempFile.delete()) { + LOG.warn("Failed to remove file[%s]", tempFile.getAbsolutePath()); + } + } + }; + } + + private static class CaseFoldedMap extends HashMap + { + public static final long serialVersionUID = 1L; + + @Override + public Object get(Object obj) + { + return super.get(StringUtils.toLowerCase((String) obj)); + } + + @Override + public Object put(String key, Object value) + { + return super.put(StringUtils.toLowerCase(key), value); + } + + @Override + public boolean containsKey(Object obj) + { + return super.containsKey(StringUtils.toLowerCase((String) obj)); + } + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlInputFormat.java b/server/src/main/java/org/apache/druid/metadata/input/SqlInputFormat.java new file mode 100644 index 000000000000..6d0aa59a20d2 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlInputFormat.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; + +import java.io.File; + +public class SqlInputFormat implements InputFormat +{ + private final ObjectMapper objectMapper; + + public SqlInputFormat(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + @Override + public boolean isSplittable() + { + return true; + } + + @Override + public InputEntityReader createReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory + ) + { + return new SqlReader(inputRowSchema, source, temporaryDirectory, objectMapper); + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java new file mode 100644 index 000000000000..57192595eaac --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + +public class SqlInputSource extends AbstractInputSource implements SplittableInputSource +{ + private final List sqls; + private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; + private final ObjectMapper objectMapper; + private final boolean foldCase; + + @JsonCreator + public SqlInputSource( + @JsonProperty("sqls") List sqls, + @JsonProperty("foldCase") boolean foldCase, + @JsonProperty("database") SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + @JacksonInject @Smile ObjectMapper objectMapper + ) + { + Preconditions.checkArgument(sqls.size() > 0, "No SQL queries provided"); + + this.sqls = sqls; + this.foldCase = foldCase; + this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.objectMapper = objectMapper; + } + + @JsonProperty + public List getSqls() + { + return sqls; + } + + @JsonProperty + public boolean isFoldCase() + { + return foldCase; + } + + @JsonProperty("database") + public SQLFirehoseDatabaseConnector getSQLFirehoseDatabaseConnector() + { + return sqlFirehoseDatabaseConnector; + } + + @Override + public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return sqls.stream().map(InputSplit::new); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return sqls.size(); + } + + @Override + public SplittableInputSource withSplit(InputSplit split) + { + return new SqlInputSource( + Collections.singletonList(split.get()), + foldCase, + sqlFirehoseDatabaseConnector, + objectMapper + ); + } + + @Override + protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) + { + final SqlInputFormat inputFormat = new SqlInputFormat(objectMapper); + return new InputEntityIteratingReader( + inputRowSchema, + inputFormat, + createSplits(inputFormat, null) + .map(split -> new SqlEntity(split.get(), sqlFirehoseDatabaseConnector, foldCase, objectMapper)).iterator(), + temporaryDirectory + ); + } + + @Override + public boolean needsFormat() + { + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SqlInputSource that = (SqlInputSource) o; + return foldCase == that.foldCase && + sqls.equals(that.sqls) && + sqlFirehoseDatabaseConnector.equals(that.sqlFirehoseDatabaseConnector) && + objectMapper.equals(that.objectMapper); + } + + @Override + public int hashCode() + { + return Objects.hash(sqls, sqlFirehoseDatabaseConnector, objectMapper, foldCase); + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java new file mode 100644 index 000000000000..ac41c2fe50ce --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.impl.prefetch.JsonIterator; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class SqlReader extends IntermediateRowParsingReader> +{ + private final InputRowSchema inputRowSchema; + private final SqlEntity source; + private final File temporaryDirectory; + private final ObjectMapper objectMapper; + + + SqlReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory, + ObjectMapper objectMapper + ) + { + this.inputRowSchema = inputRowSchema; + this.source = (SqlEntity) source; + this.temporaryDirectory = temporaryDirectory; + this.objectMapper = objectMapper; + } + + @Override + protected CloseableIterator> intermediateRowIterator() throws IOException + { + final Closer closer = Closer.create(); + final InputEntity.CleanableFile resultFile = closer.register(source.fetch(temporaryDirectory, null)); + FileInputStream inputStream = new FileInputStream(resultFile.file()); + JsonIterator> jsonIterator = new JsonIterator<>(new TypeReference>() + { + }, inputStream, closer, objectMapper); + return new CloseableIterator>() + { + @Override + public void close() throws IOException + { + jsonIterator.close(); + } + + @Override + public boolean hasNext() + { + return jsonIterator.hasNext(); + } + + @Override + public Map next() + { + return jsonIterator.next(); + } + }; + } + + @Override + protected List parseInputRows(Map intermediateRow) throws ParseException + { + return Collections.singletonList( + MapInputRowParser.parse( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + intermediateRow + ) + ); + } + + @Override + protected Map toMap(Map intermediateRow) + { + return intermediateRow; + } +} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java index 9daa231f9b7e..4ea88f649900 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java @@ -22,32 +22,23 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.guice.annotations.Smile; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; -import org.skife.jdbi.v2.ResultIterator; -import org.skife.jdbi.v2.exceptions.CallbackFailedException; -import org.skife.jdbi.v2.exceptions.ResultSetException; -import org.skife.jdbi.v2.exceptions.StatementException; +import org.apache.druid.metadata.input.SqlEntity; import javax.annotation.Nullable; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -94,79 +85,8 @@ public SqlFirehoseFactory( @Override protected InputStream openObjectStream(String sql, File fileName) throws IOException { - Preconditions.checkNotNull(sqlFirehoseDatabaseConnector, "SQL Metadata Connector not configured!"); - try (FileOutputStream fos = new FileOutputStream(fileName)) { - final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos); - sqlFirehoseDatabaseConnector.retryWithHandle( - (handle) -> { - ResultIterator> resultIterator = handle.createQuery( - sql - ).map( - (index, r, ctx) -> { - Map resultRow = foldCase ? new CaseFoldedMap() : new HashMap<>(); - ResultSetMetaData resultMetadata; - try { - resultMetadata = r.getMetaData(); - } - catch (SQLException e) { - throw new ResultSetException("Unable to obtain metadata from result set", e, ctx); - } - try { - for (int i = 1; i <= resultMetadata.getColumnCount(); i++) { - String key = resultMetadata.getColumnName(i); - String alias = resultMetadata.getColumnLabel(i); - Object value = r.getObject(i); - resultRow.put(alias != null ? alias : key, value); - } - } - catch (SQLException e) { - throw new ResultSetException("Unable to access specific metadata from " + - "result set metadata", e, ctx); - } - return resultRow; - } - ).iterator(); - jg.writeStartArray(); - while (resultIterator.hasNext()) { - jg.writeObject(resultIterator.next()); - } - jg.writeEndArray(); - jg.close(); - return null; - }, - (exception) -> { - final boolean isStatementException = exception instanceof StatementException || - (exception instanceof CallbackFailedException - && exception.getCause() instanceof StatementException); - return sqlFirehoseDatabaseConnector.isTransientException(exception) && !(isStatementException); - } - ); - } + SqlEntity.openCleanableFile(sql, sqlFirehoseDatabaseConnector, objectMapper, foldCase, fileName); return new FileInputStream(fileName); - - } - - private static class CaseFoldedMap extends HashMap - { - public static final long serialVersionUID = 1L; - - @Override - public Object get(Object obj) - { - return super.get(StringUtils.toLowerCase((String) obj)); - } - - @Override - public Object put(String key, Object value) - { - return super.put(StringUtils.toLowerCase(key), value); - } - - @Override - public boolean containsKey(Object obj) - { - return super.containsKey(StringUtils.toLowerCase((String) obj)); - } } @Override diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java new file mode 100644 index 000000000000..81f1649f34b3 --- /dev/null +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.io.FileUtils; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.Row; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.guice.FirehoseModule; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.metadata.MetadataStorageConnectorConfig; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.segment.TestHelper; +import org.easymock.EasyMock; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.skife.jdbi.v2.Batch; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class SqlInputSourceTest +{ + private static final List FIREHOSE_TMP_DIRS = new ArrayList<>(); + private final String TABLE_NAME_1 = "FOOS_TABLE_1"; + private final String TABLE_NAME_2 = "FOOS_TABLE_2"; + + private final List SQLLIST1 = ImmutableList.of("SELECT timestamp,a,b FROM FOOS_TABLE_1"); + private final List SQLLIST2 = ImmutableList.of( + "SELECT timestamp,a,b FROM FOOS_TABLE_1", + "SELECT timestamp,a,b FROM FOOS_TABLE_2" + ); + + private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")), + new ArrayList<>(), + new ArrayList<>() + ), + Collections.emptyList() + ); + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + private final ObjectMapper mapper = TestHelper.makeSmileMapper(); + private TestDerbyConnector derbyConnector; + private SqlInputSourceTest.TestDerbyFirehoseConnector derbyFirehoseConnector; + + @Before + public void setUp() + { + for (Module jacksonModule : new FirehoseModule().getJacksonModules()) { + mapper.registerModule(jacksonModule); + } + } + + @AfterClass + public static void teardown() throws IOException + { + for (File dir : FIREHOSE_TMP_DIRS) { + FileUtils.forceDelete(dir); + } + } + + private void assertResult(List rows, List sqls) + { + Assert.assertEquals(10 * sqls.size(), rows.size()); + rows.sort(Comparator.comparing(Row::getTimestamp) + .thenComparingInt(r -> Integer.valueOf(r.getDimension("a").get(0))) + .thenComparingInt(r -> Integer.valueOf(r.getDimension("b").get(0)))); + int rowCount = 0; + for (int i = 0; i < 10; i++) { + for (int j = 0; j < sqls.size(); j++) { + final Row row = rows.get(rowCount); + String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i); + Assert.assertEquals(timestampSt, row.getTimestamp().toString()); + Assert.assertEquals(i, Integer.valueOf(row.getDimension("a").get(0)).intValue()); + Assert.assertEquals(i, Integer.valueOf(row.getDimension("b").get(0)).intValue()); + rowCount++; + } + } + } + + private File createFirehoseTmpDir(String dirSuffix) throws IOException + { + final File firehoseTempDir = File.createTempFile( + SqlInputSourceTest.class.getSimpleName(), + dirSuffix + ); + FileUtils.forceDelete(firehoseTempDir); + FileUtils.forceMkdir(firehoseTempDir); + FIREHOSE_TMP_DIRS.add(firehoseTempDir); + return firehoseTempDir; + } + + private void dropTable(final String tableName) + { + derbyConnector.getDBI().withHandle( + (HandleCallback) handle -> { + handle.createStatement(StringUtils.format("DROP TABLE %s", tableName)) + .execute(); + return null; + } + ); + } + + private void createAndUpdateTable(final String tableName) + { + derbyConnector = derbyConnectorRule.getConnector(); + derbyFirehoseConnector = new SqlInputSourceTest.TestDerbyFirehoseConnector( + new MetadataStorageConnectorConfig(), + derbyConnector.getDBI() + ); + derbyConnector.createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %1$s (\n" + + " timestamp varchar(255) NOT NULL,\n" + + " a VARCHAR(255) NOT NULL,\n" + + " b VARCHAR(255) NOT NULL\n" + + ")", + tableName + ) + ) + ); + + derbyConnector.getDBI().withHandle( + (handle) -> { + Batch batch = handle.createBatch(); + for (int i = 0; i < 10; i++) { + String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i); + batch.add(StringUtils.format("INSERT INTO %1$s (timestamp, a, b) VALUES ('%2$s', '%3$s', '%4$s')", + tableName, timestampSt, + i, i + )); + } + batch.execute(); + return null; + } + ); + } + + @Test + public void testSerde() throws IOException + { + mapper.registerSubtypes(TestSerdeFirehoseConnector.class); + final SqlInputSourceTest.TestSerdeFirehoseConnector testSerdeFirehoseConnector = new SqlInputSourceTest.TestSerdeFirehoseConnector( + new MetadataStorageConnectorConfig()); + final SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testSerdeFirehoseConnector, mapper); + final String valueString = mapper.writeValueAsString(sqlInputSource); + final SqlInputSource inputSourceFromJson = mapper.readValue(valueString, SqlInputSource.class); + Assert.assertEquals(sqlInputSource, inputSourceFromJson); + } + + @Test + public void testSingleSplit() throws Exception + { + createAndUpdateTable(TABLE_NAME_1); + final File tempDir = createFirehoseTmpDir("testSingleSplit"); + SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, derbyFirehoseConnector, mapper); + InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir); + CloseableIterator resultIterator = sqlReader.read(); + final List rows = new ArrayList<>(); + while (resultIterator.hasNext()) { + rows.add(resultIterator.next()); + } + assertResult(rows, SQLLIST1); + dropTable(TABLE_NAME_1); + } + + + @Test + public void testMultipleSplits() throws Exception + { + createAndUpdateTable(TABLE_NAME_1); + createAndUpdateTable(TABLE_NAME_2); + final File tempDir = createFirehoseTmpDir("testMultipleSplit"); + SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, derbyFirehoseConnector, mapper); + InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir); + CloseableIterator resultIterator = sqlReader.read(); + final List rows = new ArrayList<>(); + while (resultIterator.hasNext()) { + rows.add(resultIterator.next()); + } + assertResult(rows, SQLLIST2); + dropTable(TABLE_NAME_1); + dropTable(TABLE_NAME_2); + } + + @Test + public void testNumSplits() + { + SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, derbyFirehoseConnector, mapper); + InputFormat inputFormat = EasyMock.createMock(InputFormat.class); + Stream> sqlSplits = sqlInputSource.createSplits(inputFormat, null); + Assert.assertEquals(SQLLIST2, sqlSplits.map(InputSplit::get).collect(Collectors.toList())); + Assert.assertEquals(2, sqlInputSource.estimateNumSplits(inputFormat, null)); + } + + private static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector + { + private final DBI dbi; + + private TestDerbyFirehoseConnector( + @JsonProperty("connectorConfig") MetadataStorageConnectorConfig metadataStorageConnectorConfig, DBI dbi + ) + { + final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig); + datasource.setDriverClassLoader(getClass().getClassLoader()); + datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver"); + this.dbi = dbi; + } + + @Override + public DBI getDBI() + { + return dbi; + } + } + + @JsonTypeName("test") + private static class TestSerdeFirehoseConnector extends SQLFirehoseDatabaseConnector + { + private final DBI dbi; + private final MetadataStorageConnectorConfig metadataStorageConnectorConfig; + + private TestSerdeFirehoseConnector( + @JsonProperty("connectorConfig") MetadataStorageConnectorConfig metadataStorageConnectorConfig + ) + { + final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig); + datasource.setDriverClassLoader(getClass().getClassLoader()); + datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver"); + this.dbi = new DBI(datasource); + this.metadataStorageConnectorConfig = metadataStorageConnectorConfig; + } + + @JsonProperty("connectorConfig") + public MetadataStorageConnectorConfig getConnectorConfig() + { + return metadataStorageConnectorConfig; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestSerdeFirehoseConnector that = (TestSerdeFirehoseConnector) o; + return metadataStorageConnectorConfig.equals(that.metadataStorageConnectorConfig); + } + + @Override + public int hashCode() + { + return Objects.hash(metadataStorageConnectorConfig); + } + + @Override + public DBI getDBI() + { + return dbi; + } + } +} From 61f62fe01a64534a485b0f8d9176ca6f18186b55 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Mon, 2 Mar 2020 16:16:16 -0600 Subject: [PATCH 02/11] Add spelling --- website/.spelling | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/website/.spelling b/website/.spelling index eed68eeb13c0..a072fcbb4b0b 100644 --- a/website/.spelling +++ b/website/.spelling @@ -149,6 +149,7 @@ S3 SDK SIGAR SPNEGO +SqlInputSource SQLServer SSD SSDs @@ -1713,4 +1714,4 @@ isRobot isUnpatrolled metroCode regionIsoCode -regionName \ No newline at end of file +regionName From d1185e49933fea9baf4e4b28c8d43417e7c543b0 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Tue, 5 May 2020 14:18:46 -0500 Subject: [PATCH 03/11] Use separate DruidModule --- .../apache/druid/guice/FirehoseModule.java | 4 +- .../metadata/input/SqlInputSourceModule.java | 48 +++++++++++++++++++ .../java/org/apache/druid/cli/CliIndexer.java | 2 + .../apache/druid/cli/CliMiddleManager.java | 2 + .../org/apache/druid/cli/CliOverlord.java | 2 + .../java/org/apache/druid/cli/CliPeon.java | 2 + 6 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/metadata/input/SqlInputSourceModule.java diff --git a/server/src/main/java/org/apache/druid/guice/FirehoseModule.java b/server/src/main/java/org/apache/druid/guice/FirehoseModule.java index e5f2a68639f8..c95b0cd42eec 100644 --- a/server/src/main/java/org/apache/druid/guice/FirehoseModule.java +++ b/server/src/main/java/org/apache/druid/guice/FirehoseModule.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import org.apache.druid.initialization.DruidModule; -import org.apache.druid.metadata.input.SqlInputSource; import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory; import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory; import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; @@ -59,8 +58,7 @@ public List getJacksonModules() new NamedType(CombiningFirehoseFactory.class, "combining"), new NamedType(FixedCountFirehoseFactory.class, "fixedCount"), new NamedType(SqlFirehoseFactory.class, "sql"), - new NamedType(InlineFirehoseFactory.class, "inline"), - new NamedType(SqlInputSource.class, "sql") + new NamedType(InlineFirehoseFactory.class, "inline") ) ); } diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSourceModule.java b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSourceModule.java new file mode 100644 index 000000000000..ebe5f1451680 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSourceModule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; + +import java.util.List; + +public class SqlInputSourceModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("SqlInputSourceModule") + .registerSubtypes( + new NamedType(SqlInputSource.class, "sql") + ) + ); + } + + @Override + public void configure(Binder binder) + { + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 7483fec93a93..cf0b3a41a44e 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -58,6 +58,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.http.ShuffleResource; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.input.SqlInputSourceModule; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.lookup.LookupModule; import org.apache.druid.segment.realtime.CliIndexerDataSegmentServerAnnouncerLifecycleHandler; @@ -200,6 +201,7 @@ public DataNodeService getDataNodeService() new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), + new SqlInputSourceModule(), new QueryablePeonModule(), new CliIndexerServerModule(properties), new LookupModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index d273c16ad331..ee28cef6d42e 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -61,6 +61,7 @@ import org.apache.druid.indexing.worker.http.TaskManagementResource; import org.apache.druid.indexing.worker.http.WorkerResource; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.input.SqlInputSourceModule; import org.apache.druid.query.lookup.LookupSerdeModule; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager; @@ -186,6 +187,7 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), + new SqlInputSourceModule(), new LookupSerdeModule() ); } diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 21ffa9a8bff2..8f5bafe800d1 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -98,6 +98,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorResource; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.input.SqlInputSourceModule; import org.apache.druid.query.lookup.LookupSerdeModule; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager; @@ -347,6 +348,7 @@ private void configureOverlordHelpers(Binder binder) new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), + new SqlInputSourceModule(), new SupervisorModule(), new LookupSerdeModule(), new SamplerModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 1160eb9240c1..6bc1f045e01b 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -92,6 +92,7 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import org.apache.druid.metadata.input.SqlInputSourceModule; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.lookup.LookupModule; import org.apache.druid.segment.loading.DataSegmentArchiver; @@ -262,6 +263,7 @@ public SegmentListerResource getSegmentListerResource( new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), new IndexingServiceTuningConfigModule(), + new SqlInputSourceModule(), new ChatHandlerServerModule(properties), new LookupModule() ); From 7f9743b112d910d50e6348ddb098dd1532650027 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Tue, 5 May 2020 15:05:20 -0500 Subject: [PATCH 04/11] Change module name --- .../{SqlInputSourceModule.java => InputSourceModule.java} | 4 ++-- services/src/main/java/org/apache/druid/cli/CliIndexer.java | 4 ++-- .../src/main/java/org/apache/druid/cli/CliMiddleManager.java | 4 ++-- services/src/main/java/org/apache/druid/cli/CliOverlord.java | 4 ++-- services/src/main/java/org/apache/druid/cli/CliPeon.java | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) rename server/src/main/java/org/apache/druid/metadata/input/{SqlInputSourceModule.java => InputSourceModule.java} (93%) diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSourceModule.java b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java similarity index 93% rename from server/src/main/java/org/apache/druid/metadata/input/SqlInputSourceModule.java rename to server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java index ebe5f1451680..c6d0a973a0f1 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSourceModule.java +++ b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java @@ -28,13 +28,13 @@ import java.util.List; -public class SqlInputSourceModule implements DruidModule +public class InputSourceModule implements DruidModule { @Override public List getJacksonModules() { return ImmutableList.of( - new SimpleModule("SqlInputSourceModule") + new SimpleModule("InputSourceModule") .registerSubtypes( new NamedType(SqlInputSource.class, "sql") ) diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index cf0b3a41a44e..f6342944a1e7 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -58,7 +58,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.http.ShuffleResource; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.metadata.input.SqlInputSourceModule; +import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.lookup.LookupModule; import org.apache.druid.segment.realtime.CliIndexerDataSegmentServerAnnouncerLifecycleHandler; @@ -201,7 +201,7 @@ public DataNodeService getDataNodeService() new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), - new SqlInputSourceModule(), + new InputSourceModule(), new QueryablePeonModule(), new CliIndexerServerModule(properties), new LookupModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index ee28cef6d42e..8b3d12f286eb 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -61,7 +61,7 @@ import org.apache.druid.indexing.worker.http.TaskManagementResource; import org.apache.druid.indexing.worker.http.WorkerResource; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.metadata.input.SqlInputSourceModule; +import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.query.lookup.LookupSerdeModule; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager; @@ -187,7 +187,7 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), - new SqlInputSourceModule(), + new InputSourceModule(), new LookupSerdeModule() ); } diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 8f5bafe800d1..4baf155d229c 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -98,7 +98,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorResource; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.metadata.input.SqlInputSourceModule; +import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.query.lookup.LookupSerdeModule; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager; @@ -348,7 +348,7 @@ private void configureOverlordHelpers(Binder binder) new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), - new SqlInputSourceModule(), + new InputSourceModule(), new SupervisorModule(), new LookupSerdeModule(), new SamplerModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 6bc1f045e01b..9e499c3a444f 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -92,7 +92,7 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; -import org.apache.druid.metadata.input.SqlInputSourceModule; +import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.lookup.LookupModule; import org.apache.druid.segment.loading.DataSegmentArchiver; @@ -263,7 +263,7 @@ public SegmentListerResource getSegmentListerResource( new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), new IndexingServiceTuningConfigModule(), - new SqlInputSourceModule(), + new InputSourceModule(), new ChatHandlerServerModule(properties), new LookupModule() ); From 633323490f39fc279a874ae72d549c6e34762211 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Thu, 28 May 2020 17:18:49 -0500 Subject: [PATCH 05/11] Fix docs --- .../input/impl/prefetch/JsonIterator.java | 3 +- docs/ingestion/native-batch.md | 23 +++- .../metadata/input/InputSourceModule.java | 3 + .../druid/metadata/input/SqlEntity.java | 12 +- .../druid/metadata/input/SqlInputSource.java | 10 +- .../druid/metadata/input/SqlReader.java | 4 + .../realtime/firehose/SqlFirehoseFactory.java | 5 +- .../druid/metadata/input/SqlEntityTest.java | 84 +++++++++++++ .../metadata/input/SqlInputSourceTest.java | 89 +++++--------- .../druid/metadata/input/SqlTestUtils.java | 112 ++++++++++++++++++ .../firehose/SqlFirehoseFactoryTest.java | 69 ++--------- 11 files changed, 285 insertions(+), 129 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java create mode 100644 server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java index 2a241f66de7c..a7db26dbc9ba 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.parsers.CloseableIterator; import java.io.Closeable; import java.io.IOException; @@ -39,7 +40,7 @@ * * @param the type of object returned by this iterator */ -public class JsonIterator implements Iterator, Closeable +public class JsonIterator implements CloseableIterator { private JsonParser jp; private ObjectCodec objectCodec; diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index c56422561117..ffd2d8d0320a 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -53,7 +53,7 @@ The detailed behavior of the Parallel task is different depending on the [`parti See each `partitionsSpec` for more details. To use this task, the [`inputSource`](#input-sources) in the `ioConfig` should be _splittable_ and `maxNumConcurrentSubTasks` should be set to larger than 1 in the `tuningConfig`. -Otherwise, this task runs sequentially; the `index_paralllel` task reads each input file one by one and creates segments by itself. +Otherwise, this task runs sequentially; the `index_parallel` task reads each input file one by one and creates segments by itself. The supported splittable input formats for now are: - [`s3`](#s3-input-source) reads data from AWS S3 storage. @@ -1311,7 +1311,7 @@ A spec that applies a filter and reads a subset of the original datasource's col This spec above will only return the `page`, `user` dimensions and `added` metric. Only rows where `page` = `Druid` will be returned. -### Sql Input Source +### SQL Input Source The SQL input source is used to read data directly from RDBMS. The SQL input source is _splittable_ and can be used by the [Parallel task](#parallel-task), where each worker task will read from one SQL query from the list of queries. @@ -1320,7 +1320,7 @@ Since this input source has a fixed input format for reading events, no `inputFo |property|description|required?| |--------|-----------|---------| |type|This should be "sql".|Yes| -|database|Specifies the database connection details.|Yes| +|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support and this extension must be loaded into Druid. For database types `mysql` and `postgresql`, the `connectorConfig` support is provided by [mysql-metadata-storage](../development/extensions-core/mysql.md) and [postgresql-metadata-storage](../development/extensions-core/postgresql.md) extensions respectively.|Yes| |foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|No| |sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.|Yes| @@ -1345,8 +1345,21 @@ An example SqlInputSource spec is shown below: ... ``` -The spec above will read all events from two separate sqls -within the interval `2013-01-01/2013-01-02`. +The spec above will read all events from two separate SQLs within the interval `2013-01-01/2013-01-02`. +Each of the SQL queries will be run in its own sub-task and thus for the above example, there would be two sub-tasks. + +Compared to the other native batch InputSources, SQL InputSource behaves differently in terms of reading the input data and so it would be helpful to consider the following points before using this InputSource in a production environment: + +* During indexing, each task would execute one of the SQL queries and the results are stored locally on disk. The tasks then proceed to read the data from these local input files and generate segments. Presently, there isn’t a restriction on the size of the generated files and this would require the MiddleManagers or Indexers to have sufficient disk capacity based on the volume of data being indexed. + +* Filtering the SQL queries based on the intervals specified in the `granularitySpec` can avoid unwanted data being retrieved and stored by the indexing process. + +* Pagination may be used on the SQL queries to ensure that each query pulls a similar amount of data, thereby improving the efficiency of the sub-tasks. + +* Similar to file-based input formats, any updates to existing data will replace the data in segments specific to the intervals specified in the `granularitySpec`. + + +### ## Firehoses (Deprecated) diff --git a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java index c6d0a973a0f1..0423af4f9a03 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java +++ b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java @@ -28,6 +28,9 @@ import java.util.List; +/** + * Module that installs {@link org.apache.druid.data.input.InputSource} implementations + */ public class InputSourceModule implements DruidModule { @Override diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java index f00c3e40ca81..647de9000fcb 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java @@ -42,6 +42,9 @@ import java.util.HashMap; import java.util.Map; +/** + * Represents a rdbms based input resource and knows how to read query results from the resource using SQL queries. + */ public class SqlEntity implements InputEntity { private static final Logger LOG = new Logger(SqlEntity.class); @@ -59,7 +62,10 @@ public SqlEntity( ) { this.sql = sql; - this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.sqlFirehoseDatabaseConnector = Preconditions.checkNotNull( + sqlFirehoseDatabaseConnector, + "SQL Metadata Connector not configured!" + ); this.foldCase = foldCase; this.objectMapper = objectMapper; } @@ -99,9 +105,11 @@ public static CleanableFile openCleanableFile( ) throws IOException { - Preconditions.checkNotNull(sqlFirehoseDatabaseConnector, "SQL Metadata Connector not configured!"); try (FileOutputStream fos = new FileOutputStream(tempFile)) { final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos); + + // Execute the sql query and lazily retrieve the results into the file in json format. + // foldCase is useful to handle differences in case sensitivity behavior across databases. sqlFirehoseDatabaseConnector.retryWithHandle( (handle) -> { ResultIterator> resultIterator = handle.createQuery( diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java index 57192595eaac..c7dfbb7fa365 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java @@ -61,7 +61,10 @@ public SqlInputSource( this.sqls = sqls; this.foldCase = foldCase; - this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.sqlFirehoseDatabaseConnector = Preconditions.checkNotNull( + sqlFirehoseDatabaseConnector, + "SQL Metadata Connector not configured!" + ); this.objectMapper = objectMapper; } @@ -137,13 +140,12 @@ public boolean equals(Object o) SqlInputSource that = (SqlInputSource) o; return foldCase == that.foldCase && sqls.equals(that.sqls) && - sqlFirehoseDatabaseConnector.equals(that.sqlFirehoseDatabaseConnector) && - objectMapper.equals(that.objectMapper); + sqlFirehoseDatabaseConnector.equals(that.sqlFirehoseDatabaseConnector); } @Override public int hashCode() { - return Objects.hash(sqls, sqlFirehoseDatabaseConnector, objectMapper, foldCase); + return Objects.hash(sqls, sqlFirehoseDatabaseConnector, foldCase); } } diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java index ac41c2fe50ce..e4b900835f50 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java @@ -38,6 +38,9 @@ import java.util.List; import java.util.Map; +/** + * Reader exclusively for {@link SqlEntity} + */ public class SqlReader extends IntermediateRowParsingReader> { private final InputRowSchema inputRowSchema; @@ -63,6 +66,7 @@ public class SqlReader extends IntermediateRowParsingReader> protected CloseableIterator> intermediateRowIterator() throws IOException { final Closer closer = Closer.create(); + //The results are fetched into local storage as this avoids having to keep a persistent database connection for a long time final InputEntity.CleanableFile resultFile = closer.register(source.fetch(temporaryDirectory, null)); FileInputStream inputStream = new FileInputStream(resultFile.file()); JsonIterator> jsonIterator = new JsonIterator<>(new TypeReference>() diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java index 4ea88f649900..0b5863d671d6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java @@ -77,7 +77,10 @@ public SqlFirehoseFactory( this.sqls = sqls; this.objectMapper = objectMapper; - this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.sqlFirehoseDatabaseConnector = Preconditions.checkNotNull( + sqlFirehoseDatabaseConnector, + "SQL Metadata Connector not configured!" + ); this.foldCase = foldCase; this.connectorConfig = null; } diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java new file mode 100644 index 000000000000..45a56ea833c2 --- /dev/null +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +public class SqlEntityTest +{ + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + private final ObjectMapper mapper = TestHelper.makeSmileMapper(); + private TestDerbyConnector derbyConnector; + String TABLE_NAME_1 = "FOOS_TABLE"; + + String TEST_SQL = "SELECT timestamp,a,b FROM FOOS_TABLE"; + String resultJson = "[{\"a\":\"0\"," + + "\"b\":\"0\"," + + "\"timestamp\":\"2011-01-12T00:00:00.000Z\"" + + "}]"; + + @Before + public void setUp() + { + for (Module jacksonModule : new InputSourceModule().getJacksonModules()) { + mapper.registerModule(jacksonModule); + } + } + + @Test + public void testExecuteQuery() throws IOException + { + SqlTestUtils testUtils = new SqlTestUtils(); + derbyConnector = derbyConnectorRule.getConnector(); + testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 1); + File tmpFile = File.createTempFile( + "testQueryResults", + "" + ); + InputEntity.CleanableFile queryResult = SqlEntity.openCleanableFile( + TEST_SQL, + testUtils.getDerbyFirehoseConnector(), + mapper, + true, + tmpFile + ); + InputStream queryInputStream = new FileInputStream(queryResult.file()); + String actualJson = IOUtils.toString(queryInputStream, StandardCharsets.UTF_8); + + Assert.assertEquals(actualJson, resultJson); + testUtils.dropTable(TABLE_NAME_1, derbyConnector); + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java index 81f1649f34b3..0be090da3040 100644 --- a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.io.FileUtils; import org.apache.druid.data.input.InputFormat; @@ -34,7 +35,6 @@ import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; -import org.apache.druid.guice.FirehoseModule; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.MetadataStorageConnectorConfig; @@ -47,9 +47,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.tweak.HandleCallback; import java.io.File; import java.io.IOException; @@ -92,7 +90,7 @@ public class SqlInputSourceTest @Before public void setUp() { - for (Module jacksonModule : new FirehoseModule().getJacksonModules()) { + for (Module jacksonModule : new InputSourceModule().getJacksonModules()) { mapper.registerModule(jacksonModule); } } @@ -136,54 +134,6 @@ private File createFirehoseTmpDir(String dirSuffix) throws IOException return firehoseTempDir; } - private void dropTable(final String tableName) - { - derbyConnector.getDBI().withHandle( - (HandleCallback) handle -> { - handle.createStatement(StringUtils.format("DROP TABLE %s", tableName)) - .execute(); - return null; - } - ); - } - - private void createAndUpdateTable(final String tableName) - { - derbyConnector = derbyConnectorRule.getConnector(); - derbyFirehoseConnector = new SqlInputSourceTest.TestDerbyFirehoseConnector( - new MetadataStorageConnectorConfig(), - derbyConnector.getDBI() - ); - derbyConnector.createTable( - tableName, - ImmutableList.of( - StringUtils.format( - "CREATE TABLE %1$s (\n" - + " timestamp varchar(255) NOT NULL,\n" - + " a VARCHAR(255) NOT NULL,\n" - + " b VARCHAR(255) NOT NULL\n" - + ")", - tableName - ) - ) - ); - - derbyConnector.getDBI().withHandle( - (handle) -> { - Batch batch = handle.createBatch(); - for (int i = 0; i < 10; i++) { - String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i); - batch.add(StringUtils.format("INSERT INTO %1$s (timestamp, a, b) VALUES ('%2$s', '%3$s', '%4$s')", - tableName, timestampSt, - i, i - )); - } - batch.execute(); - return null; - } - ); - } - @Test public void testSerde() throws IOException { @@ -199,9 +149,11 @@ public void testSerde() throws IOException @Test public void testSingleSplit() throws Exception { - createAndUpdateTable(TABLE_NAME_1); + SqlTestUtils testUtils = new SqlTestUtils(); + derbyConnector = derbyConnectorRule.getConnector(); + testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 10); final File tempDir = createFirehoseTmpDir("testSingleSplit"); - SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, derbyFirehoseConnector, mapper); + SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testUtils.getDerbyFirehoseConnector(), mapper); InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir); CloseableIterator resultIterator = sqlReader.read(); final List rows = new ArrayList<>(); @@ -209,15 +161,17 @@ public void testSingleSplit() throws Exception rows.add(resultIterator.next()); } assertResult(rows, SQLLIST1); - dropTable(TABLE_NAME_1); + testUtils.dropTable(TABLE_NAME_1, derbyConnector); } @Test public void testMultipleSplits() throws Exception { - createAndUpdateTable(TABLE_NAME_1); - createAndUpdateTable(TABLE_NAME_2); + SqlTestUtils testUtils = new SqlTestUtils(); + derbyConnector = derbyConnectorRule.getConnector(); + testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 10); + testUtils.createAndUpdateTable(TABLE_NAME_2, derbyConnector, 10); final File tempDir = createFirehoseTmpDir("testMultipleSplit"); SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, derbyFirehoseConnector, mapper); InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir); @@ -227,8 +181,8 @@ public void testMultipleSplits() throws Exception rows.add(resultIterator.next()); } assertResult(rows, SQLLIST2); - dropTable(TABLE_NAME_1); - dropTable(TABLE_NAME_2); + testUtils.dropTable(TABLE_NAME_1, derbyConnector); + testUtils.dropTable(TABLE_NAME_2, derbyConnector); } @Test @@ -241,7 +195,22 @@ public void testNumSplits() Assert.assertEquals(2, sqlInputSource.estimateNumSplits(inputFormat, null)); } - private static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector + @Test + public void testEquals() + { + EqualsVerifier.forClass(SqlInputSource.class) + .withPrefabValues( + ObjectMapper.class, + new ObjectMapper(), + new ObjectMapper() + ) + .withIgnoredFields("objectMapper") + .withNonnullFields("sqls", "sqlFirehoseDatabaseConnector") + .usingGetClass() + .verify(); + } + + protected static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector { private final DBI dbi; diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java b/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java new file mode 100644 index 000000000000..38d1fdecc37a --- /dev/null +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.MetadataStorageConnectorConfig; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.apache.druid.metadata.TestDerbyConnector; +import org.junit.Rule; +import org.skife.jdbi.v2.Batch; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.tweak.HandleCallback; + +public class SqlTestUtils +{ + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + private TestDerbyFirehoseConnector derbyFirehoseConnector; + + private static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector + { + private final DBI dbi; + + private TestDerbyFirehoseConnector( + @JsonProperty("connectorConfig") MetadataStorageConnectorConfig metadataStorageConnectorConfig, DBI dbi + ) + { + final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig); + datasource.setDriverClassLoader(getClass().getClassLoader()); + datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver"); + this.dbi = dbi; + } + + @Override + public DBI getDBI() + { + return dbi; + } + } + + public void createAndUpdateTable(final String tableName, TestDerbyConnector derbyConnector, int numEntries) + { + derbyFirehoseConnector = new SqlTestUtils.TestDerbyFirehoseConnector( + new MetadataStorageConnectorConfig(), + derbyConnector.getDBI() + ); + derbyConnector.createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %1$s (\n" + + " timestamp varchar(255) NOT NULL,\n" + + " a VARCHAR(255) NOT NULL,\n" + + " b VARCHAR(255) NOT NULL\n" + + ")", + tableName + ) + ) + ); + + derbyConnector.getDBI().withHandle( + (handle) -> { + Batch batch = handle.createBatch(); + for (int i = 0; i < numEntries; i++) { + String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i); + batch.add(StringUtils.format("INSERT INTO %1$s (timestamp, a, b) VALUES ('%2$s', '%3$s', '%4$s')", + tableName, timestampSt, + i, i + )); + } + batch.execute(); + return null; + } + ); + } + + public void dropTable(final String tableName, TestDerbyConnector derbyConnector) + { + derbyConnector.getDBI().withHandle( + (HandleCallback) handle -> { + handle.createStatement(StringUtils.format("DROP TABLE %s", tableName)) + .execute(); + return null; + } + ); + } + + public TestDerbyFirehoseConnector getDerbyFirehoseConnector() + { + return derbyFirehoseConnector; + } +} diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java index 34fa763613fe..9041f3e80e4a 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java @@ -34,6 +34,7 @@ import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.metadata.input.SqlTestUtils; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.transform.TransformSpec; import org.junit.AfterClass; @@ -41,9 +42,7 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.tweak.HandleCallback; import java.io.File; import java.io.IOException; @@ -139,56 +138,12 @@ private File createFirehoseTmpDir(String dirSuffix) throws IOException return firehoseTempDir; } - private void dropTable(final String tableName) - { - derbyConnector.getDBI().withHandle( - (HandleCallback) handle -> { - handle.createStatement(StringUtils.format("DROP TABLE %s", tableName)) - .execute(); - return null; - } - ); - } - - private void createAndUpdateTable(final String tableName) - { - derbyConnector = derbyConnectorRule.getConnector(); - derbyFirehoseConnector = new TestDerbyFirehoseConnector(new MetadataStorageConnectorConfig(), - derbyConnector.getDBI()); - derbyConnector.createTable( - tableName, - ImmutableList.of( - StringUtils.format( - "CREATE TABLE %1$s (\n" - + " timestamp varchar(255) NOT NULL,\n" - + " a VARCHAR(255) NOT NULL,\n" - + " b VARCHAR(255) NOT NULL\n" - + ")", - tableName - ) - ) - ); - - derbyConnector.getDBI().withHandle( - (handle) -> { - Batch batch = handle.createBatch(); - for (int i = 0; i < 10; i++) { - String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i); - batch.add(StringUtils.format("INSERT INTO %1$s (timestamp, a, b) VALUES ('%2$s', '%3$s', '%4$s')", - tableName, timestampSt, - i, i - )); - } - batch.execute(); - return null; - } - ); - } - @Test public void testWithoutCacheAndFetch() throws Exception { - createAndUpdateTable(TABLE_NAME_1); + SqlTestUtils testUtils = new SqlTestUtils(); + derbyConnector = derbyConnectorRule.getConnector(); + testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 10); final SqlFirehoseFactory factory = new SqlFirehoseFactory( SQLLIST1, @@ -211,14 +166,15 @@ public void testWithoutCacheAndFetch() throws Exception assertResult(rows, SQLLIST1); assertNumRemainingCacheFiles(firehoseTmpDir, 0); - dropTable(TABLE_NAME_1); + testUtils.dropTable(TABLE_NAME_1, derbyConnector); } @Test public void testWithoutCache() throws IOException { - createAndUpdateTable(TABLE_NAME_1); + SqlTestUtils testUtils = new SqlTestUtils(); + testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 10); final SqlFirehoseFactory factory = new SqlFirehoseFactory( SQLLIST1, @@ -242,15 +198,16 @@ public void testWithoutCache() throws IOException assertResult(rows, SQLLIST1); assertNumRemainingCacheFiles(firehoseTmpDir, 0); - dropTable(TABLE_NAME_1); + testUtils.dropTable(TABLE_NAME_1, derbyConnector); } @Test public void testWithCacheAndFetch() throws IOException { - createAndUpdateTable(TABLE_NAME_1); - createAndUpdateTable(TABLE_NAME_2); + SqlTestUtils testUtils = new SqlTestUtils(); + testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 10); + testUtils.createAndUpdateTable(TABLE_NAME_2, derbyConnector, 10); final SqlFirehoseFactory factory = new SqlFirehoseFactory( @@ -274,8 +231,8 @@ public void testWithCacheAndFetch() throws IOException assertResult(rows, SQLLIST2); assertNumRemainingCacheFiles(firehoseTmpDir, 2); - dropTable(TABLE_NAME_1); - dropTable(TABLE_NAME_2); + testUtils.dropTable(TABLE_NAME_1, derbyConnector); + testUtils.dropTable(TABLE_NAME_2, derbyConnector); } private static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector From 0328eaded9f4b85649316bfca72a79e5b7ed1db9 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Fri, 29 May 2020 11:13:07 -0500 Subject: [PATCH 06/11] Use sqltestutils for tests --- .../input/impl/prefetch/JsonIterator.java | 1 - docs/ingestion/native-batch.md | 4 +- .../druid/metadata/input/SqlEntityTest.java | 6 +-- .../metadata/input/SqlInputSourceTest.java | 44 +++++-------------- .../druid/metadata/input/SqlTestUtils.java | 20 ++++++--- .../firehose/SqlFirehoseFactoryTest.java | 31 ++++++------- 6 files changed, 46 insertions(+), 60 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java index a7db26dbc9ba..c03e5f6d8e84 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java @@ -32,7 +32,6 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.util.Iterator; import java.util.NoSuchElementException; /** diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index ffd2d8d0320a..39e12b0ac176 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1350,9 +1350,9 @@ Each of the SQL queries will be run in its own sub-task and thus for the above e Compared to the other native batch InputSources, SQL InputSource behaves differently in terms of reading the input data and so it would be helpful to consider the following points before using this InputSource in a production environment: -* During indexing, each task would execute one of the SQL queries and the results are stored locally on disk. The tasks then proceed to read the data from these local input files and generate segments. Presently, there isn’t a restriction on the size of the generated files and this would require the MiddleManagers or Indexers to have sufficient disk capacity based on the volume of data being indexed. +* During indexing, each sub-task would execute one of the SQL queries and the results are stored locally on disk. The sub-tasks then proceed to read the data from these local input files and generate segments. Presently, there isn’t any restriction on the size of the generated files and this would require the MiddleManagers or Indexers to have sufficient disk capacity based on the volume of data being indexed. -* Filtering the SQL queries based on the intervals specified in the `granularitySpec` can avoid unwanted data being retrieved and stored by the indexing process. +* Filtering the SQL queries based on the intervals specified in the `granularitySpec` can avoid unwanted data being retrieved and stored locally by the indexing sub-tasks. * Pagination may be used on the SQL queries to ensure that each query pulls a similar amount of data, thereby improving the efficiency of the sub-tasks. diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java index 45a56ea833c2..50a0288d26c2 100644 --- a/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java @@ -61,9 +61,9 @@ public void setUp() @Test public void testExecuteQuery() throws IOException { - SqlTestUtils testUtils = new SqlTestUtils(); derbyConnector = derbyConnectorRule.getConnector(); - testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 1); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 1); File tmpFile = File.createTempFile( "testQueryResults", "" @@ -79,6 +79,6 @@ public void testExecuteQuery() throws IOException String actualJson = IOUtils.toString(queryInputStream, StandardCharsets.UTF_8); Assert.assertEquals(actualJson, resultJson); - testUtils.dropTable(TABLE_NAME_1, derbyConnector); + testUtils.dropTable(TABLE_NAME_1); } } diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java index 0be090da3040..7afa88894f26 100644 --- a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java @@ -85,7 +85,6 @@ public class SqlInputSourceTest public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); private final ObjectMapper mapper = TestHelper.makeSmileMapper(); private TestDerbyConnector derbyConnector; - private SqlInputSourceTest.TestDerbyFirehoseConnector derbyFirehoseConnector; @Before public void setUp() @@ -149,9 +148,9 @@ public void testSerde() throws IOException @Test public void testSingleSplit() throws Exception { - SqlTestUtils testUtils = new SqlTestUtils(); derbyConnector = derbyConnectorRule.getConnector(); - testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 10); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 10); final File tempDir = createFirehoseTmpDir("testSingleSplit"); SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testUtils.getDerbyFirehoseConnector(), mapper); InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir); @@ -161,19 +160,19 @@ public void testSingleSplit() throws Exception rows.add(resultIterator.next()); } assertResult(rows, SQLLIST1); - testUtils.dropTable(TABLE_NAME_1, derbyConnector); + testUtils.dropTable(TABLE_NAME_1); } @Test public void testMultipleSplits() throws Exception { - SqlTestUtils testUtils = new SqlTestUtils(); derbyConnector = derbyConnectorRule.getConnector(); - testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 10); - testUtils.createAndUpdateTable(TABLE_NAME_2, derbyConnector, 10); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 10); + testUtils.createAndUpdateTable(TABLE_NAME_2, 10); final File tempDir = createFirehoseTmpDir("testMultipleSplit"); - SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, derbyFirehoseConnector, mapper); + SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, testUtils.getDerbyFirehoseConnector(), mapper); InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir); CloseableIterator resultIterator = sqlReader.read(); final List rows = new ArrayList<>(); @@ -181,14 +180,16 @@ public void testMultipleSplits() throws Exception rows.add(resultIterator.next()); } assertResult(rows, SQLLIST2); - testUtils.dropTable(TABLE_NAME_1, derbyConnector); - testUtils.dropTable(TABLE_NAME_2, derbyConnector); + testUtils.dropTable(TABLE_NAME_1); + testUtils.dropTable(TABLE_NAME_2); } @Test public void testNumSplits() { - SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, derbyFirehoseConnector, mapper); + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, testUtils.getDerbyFirehoseConnector(), mapper); InputFormat inputFormat = EasyMock.createMock(InputFormat.class); Stream> sqlSplits = sqlInputSource.createSplits(inputFormat, null); Assert.assertEquals(SQLLIST2, sqlSplits.map(InputSplit::get).collect(Collectors.toList())); @@ -210,27 +211,6 @@ public void testEquals() .verify(); } - protected static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector - { - private final DBI dbi; - - private TestDerbyFirehoseConnector( - @JsonProperty("connectorConfig") MetadataStorageConnectorConfig metadataStorageConnectorConfig, DBI dbi - ) - { - final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig); - datasource.setDriverClassLoader(getClass().getClassLoader()); - datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver"); - this.dbi = dbi; - } - - @Override - public DBI getDBI() - { - return dbi; - } - } - @JsonTypeName("test") private static class TestSerdeFirehoseConnector extends SQLFirehoseDatabaseConnector { diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java b/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java index 38d1fdecc37a..60e7c73e4397 100644 --- a/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java @@ -35,7 +35,17 @@ public class SqlTestUtils { @Rule public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); - private TestDerbyFirehoseConnector derbyFirehoseConnector; + private final TestDerbyFirehoseConnector derbyFirehoseConnector; + private final TestDerbyConnector derbyConnector; + + public SqlTestUtils(TestDerbyConnector derbyConnector) + { + this.derbyConnector = derbyConnector; + this.derbyFirehoseConnector = new SqlTestUtils.TestDerbyFirehoseConnector( + new MetadataStorageConnectorConfig(), + derbyConnector.getDBI() + ); + } private static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector { @@ -58,12 +68,8 @@ public DBI getDBI() } } - public void createAndUpdateTable(final String tableName, TestDerbyConnector derbyConnector, int numEntries) + public void createAndUpdateTable(final String tableName, int numEntries) { - derbyFirehoseConnector = new SqlTestUtils.TestDerbyFirehoseConnector( - new MetadataStorageConnectorConfig(), - derbyConnector.getDBI() - ); derbyConnector.createTable( tableName, ImmutableList.of( @@ -94,7 +100,7 @@ public void createAndUpdateTable(final String tableName, TestDerbyConnector derb ); } - public void dropTable(final String tableName, TestDerbyConnector derbyConnector) + public void dropTable(final String tableName) { derbyConnector.getDBI().withHandle( (HandleCallback) handle -> { diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java index 9041f3e80e4a..189aa4984aac 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java @@ -81,7 +81,6 @@ public class SqlFirehoseFactoryTest ) ); private TestDerbyConnector derbyConnector; - private TestDerbyFirehoseConnector derbyFirehoseConnector; @BeforeClass public static void setup() throws IOException @@ -141,9 +140,9 @@ private File createFirehoseTmpDir(String dirSuffix) throws IOException @Test public void testWithoutCacheAndFetch() throws Exception { - SqlTestUtils testUtils = new SqlTestUtils(); derbyConnector = derbyConnectorRule.getConnector(); - testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 10); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 10); final SqlFirehoseFactory factory = new SqlFirehoseFactory( SQLLIST1, @@ -152,7 +151,7 @@ public void testWithoutCacheAndFetch() throws Exception 0L, 0L, true, - derbyFirehoseConnector, + testUtils.getDerbyFirehoseConnector(), mapper ); @@ -166,15 +165,16 @@ public void testWithoutCacheAndFetch() throws Exception assertResult(rows, SQLLIST1); assertNumRemainingCacheFiles(firehoseTmpDir, 0); - testUtils.dropTable(TABLE_NAME_1, derbyConnector); + testUtils.dropTable(TABLE_NAME_1); } @Test public void testWithoutCache() throws IOException { - SqlTestUtils testUtils = new SqlTestUtils(); - testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 10); + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 10); final SqlFirehoseFactory factory = new SqlFirehoseFactory( SQLLIST1, @@ -183,7 +183,7 @@ public void testWithoutCache() throws IOException null, null, true, - derbyFirehoseConnector, + testUtils.getDerbyFirehoseConnector(), mapper ); @@ -198,16 +198,17 @@ public void testWithoutCache() throws IOException assertResult(rows, SQLLIST1); assertNumRemainingCacheFiles(firehoseTmpDir, 0); - testUtils.dropTable(TABLE_NAME_1, derbyConnector); + testUtils.dropTable(TABLE_NAME_1); } @Test public void testWithCacheAndFetch() throws IOException { - SqlTestUtils testUtils = new SqlTestUtils(); - testUtils.createAndUpdateTable(TABLE_NAME_1, derbyConnector, 10); - testUtils.createAndUpdateTable(TABLE_NAME_2, derbyConnector, 10); + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 10); + testUtils.createAndUpdateTable(TABLE_NAME_2, 10); final SqlFirehoseFactory factory = new SqlFirehoseFactory( @@ -217,7 +218,7 @@ public void testWithCacheAndFetch() throws IOException 0L, null, true, - derbyFirehoseConnector, + testUtils.getDerbyFirehoseConnector(), mapper ); @@ -231,8 +232,8 @@ public void testWithCacheAndFetch() throws IOException assertResult(rows, SQLLIST2); assertNumRemainingCacheFiles(firehoseTmpDir, 2); - testUtils.dropTable(TABLE_NAME_1, derbyConnector); - testUtils.dropTable(TABLE_NAME_2, derbyConnector); + testUtils.dropTable(TABLE_NAME_1); + testUtils.dropTable(TABLE_NAME_2); } private static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector From 62dc00b2eb9597e33a2bdb37642b94197d36f3f9 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Mon, 1 Jun 2020 15:16:21 -0500 Subject: [PATCH 07/11] Add additional tests --- docs/ingestion/native-batch.md | 6 +-- .../druid/metadata/input/SqlEntity.java | 48 +++++++++++------ .../druid/metadata/input/SqlReader.java | 21 +------- .../druid/metadata/input/SqlEntityTest.java | 54 ++++++++++++++++++- website/.spelling | 3 +- 5 files changed, 91 insertions(+), 41 deletions(-) diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 39e12b0ac176..18165a272e21 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1340,19 +1340,19 @@ An example SqlInputSource spec is shown below: "password": "password" } }, - "sqls": ["SELECT * FROM table1", "SELECT * FROM table2"] + "sqls": ["SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'"] }, ... ``` -The spec above will read all events from two separate SQLs within the interval `2013-01-01/2013-01-02`. +The spec above will read all events from two separate SQLs for the interval `2013-01-01/2013-01-02`. Each of the SQL queries will be run in its own sub-task and thus for the above example, there would be two sub-tasks. Compared to the other native batch InputSources, SQL InputSource behaves differently in terms of reading the input data and so it would be helpful to consider the following points before using this InputSource in a production environment: * During indexing, each sub-task would execute one of the SQL queries and the results are stored locally on disk. The sub-tasks then proceed to read the data from these local input files and generate segments. Presently, there isn’t any restriction on the size of the generated files and this would require the MiddleManagers or Indexers to have sufficient disk capacity based on the volume of data being indexed. -* Filtering the SQL queries based on the intervals specified in the `granularitySpec` can avoid unwanted data being retrieved and stored locally by the indexing sub-tasks. +* Filtering the SQL queries based on the intervals specified in the `granularitySpec` can avoid unwanted data being retrieved and stored locally by the indexing sub-tasks. For example, if the `intervals` specified in the `granularitySpec` is `["2013-01-01/2013-01-02"]` and the SQL query is `SELECT * FROM table1`, `SqlInputSource` will read all the data for `table1` based on the query, even though only data between the intervals specified will be indexed into Druid. * Pagination may be used on the SQL queries to ensure that each query pulls a similar amount of data, thereby improving the efficiency of the sub-tasks. diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java index 647de9000fcb..1cabfbed4516 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java @@ -96,6 +96,18 @@ public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws I } + /** + * Executes a SQL query on the specified database and fetches the result into the given file. + * The result file is deleted if the query execution or the file write fails. + * + * @param sql The SQL query to be executed + * @param sqlFirehoseDatabaseConnector The database connector + * @param objectMapper An object mapper, used for deserialization + * @param foldCase A boolean flag used to enable or disabling case sensitivity while handling database column names + * + * @return A {@link org.apache.druid.data.input.InputEntity.CleanableFile} object that wraps the file containing the SQL results + */ + public static CleanableFile openCleanableFile( String sql, SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, @@ -105,8 +117,8 @@ public static CleanableFile openCleanableFile( ) throws IOException { - try (FileOutputStream fos = new FileOutputStream(tempFile)) { - final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos); + try (FileOutputStream fos = new FileOutputStream(tempFile); + final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos);) { // Execute the sql query and lazily retrieve the results into the file in json format. // foldCase is useful to handle differences in case sensitivity behavior across databases. @@ -154,23 +166,29 @@ public static CleanableFile openCleanableFile( return sqlFirehoseDatabaseConnector.isTransientException(exception) && !(isStatementException); } ); - } - return new CleanableFile() - { - @Override - public File file() + return new CleanableFile() { - return tempFile; - } + @Override + public File file() + { + return tempFile; + } - @Override - public void close() - { - if (!tempFile.delete()) { - LOG.warn("Failed to remove file[%s]", tempFile.getAbsolutePath()); + @Override + public void close() + { + if (!tempFile.delete()) { + LOG.warn("Failed to remove file[%s]", tempFile.getAbsolutePath()); + } } + }; + } + catch (Exception e) { + if (!tempFile.delete()) { + LOG.warn("Failed to remove file[%s]", tempFile.getAbsolutePath()); } - }; + throw new IOException(e); + } } private static class CaseFoldedMap extends HashMap diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java index e4b900835f50..4657158c0463 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java @@ -72,26 +72,7 @@ protected CloseableIterator> intermediateRowIterator() throw JsonIterator> jsonIterator = new JsonIterator<>(new TypeReference>() { }, inputStream, closer, objectMapper); - return new CloseableIterator>() - { - @Override - public void close() throws IOException - { - jsonIterator.close(); - } - - @Override - public boolean hasNext() - { - return jsonIterator.hasNext(); - } - - @Override - public Map next() - { - return jsonIterator.next(); - } - }; + return jsonIterator; } @Override diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java index 50a0288d26c2..46a171b50c92 100644 --- a/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlEntityTest.java @@ -40,11 +40,13 @@ public class SqlEntityTest { @Rule public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + private final ObjectMapper mapper = TestHelper.makeSmileMapper(); private TestDerbyConnector derbyConnector; String TABLE_NAME_1 = "FOOS_TABLE"; - String TEST_SQL = "SELECT timestamp,a,b FROM FOOS_TABLE"; + String VALID_SQL = "SELECT timestamp,a,b FROM FOOS_TABLE"; + String INVALID_SQL = "DONT SELECT timestamp,a,b FROM FOOS_TABLE"; String resultJson = "[{\"a\":\"0\"," + "\"b\":\"0\"," + "\"timestamp\":\"2011-01-12T00:00:00.000Z\"" @@ -69,7 +71,7 @@ public void testExecuteQuery() throws IOException "" ); InputEntity.CleanableFile queryResult = SqlEntity.openCleanableFile( - TEST_SQL, + VALID_SQL, testUtils.getDerbyFirehoseConnector(), mapper, true, @@ -81,4 +83,52 @@ public void testExecuteQuery() throws IOException Assert.assertEquals(actualJson, resultJson); testUtils.dropTable(TABLE_NAME_1); } + + @Test(expected = IOException.class) + public void testFailOnInvalidQuery() throws IOException + { + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 1); + File tmpFile = File.createTempFile( + "testQueryResults", + "" + ); + InputEntity.CleanableFile queryResult = SqlEntity.openCleanableFile( + INVALID_SQL, + testUtils.getDerbyFirehoseConnector(), + mapper, + true, + tmpFile + ); + + Assert.assertTrue(tmpFile.exists()); + } + + @Test + public void testFileDeleteOnInvalidQuery() throws IOException + { + //The test parameters here are same as those used for testFailOnInvalidQuery(). + //The only difference is that this test checks if the temporary file is deleted upon failure. + derbyConnector = derbyConnectorRule.getConnector(); + SqlTestUtils testUtils = new SqlTestUtils(derbyConnector); + testUtils.createAndUpdateTable(TABLE_NAME_1, 1); + File tmpFile = File.createTempFile( + "testQueryResults", + "" + ); + try { + SqlEntity.openCleanableFile( + INVALID_SQL, + testUtils.getDerbyFirehoseConnector(), + mapper, + true, + tmpFile + ); + } + // Lets catch the exception so as to test temporary file deletion. + catch (IOException e) { + Assert.assertFalse(tmpFile.exists()); + } + } } diff --git a/website/.spelling b/website/.spelling index 46e2402673ac..e42a9cdadf53 100644 --- a/website/.spelling +++ b/website/.spelling @@ -98,6 +98,7 @@ IndexTask InfluxDB InputFormat InputSource +InputSources Integer.MAX_VALUE JBOD JDBC @@ -1752,4 +1753,4 @@ UserGroupInformation CVE-2019-17571 CVE-2019-12399 CVE-2018-17196 -bin.tar.gz \ No newline at end of file +bin.tar.gz From 53cb148570ccb48903a3f6d587287bdc9220f0fc Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Mon, 1 Jun 2020 16:14:36 -0500 Subject: [PATCH 08/11] Fix inspection --- .../main/java/org/apache/druid/metadata/input/SqlEntity.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java index 1cabfbed4516..d45a470a4369 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java @@ -105,7 +105,7 @@ public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws I * @param objectMapper An object mapper, used for deserialization * @param foldCase A boolean flag used to enable or disabling case sensitivity while handling database column names * - * @return A {@link org.apache.druid.data.input.InputEntity.CleanableFile} object that wraps the file containing the SQL results + * @return A {@link InputEntity.CleanableFile} object that wraps the file containing the SQL results */ public static CleanableFile openCleanableFile( From eefc29262357e2e4bb5025b0e4a13bcb64931e8a Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Thu, 4 Jun 2020 12:18:29 -0500 Subject: [PATCH 09/11] Add module test --- docs/ingestion/native-batch.md | 2 + .../SQLMetadataStorageActionHandler.java | 2 +- .../druid/metadata/input/SqlEntity.java | 17 ++--- .../metadata/input/InputSourceModuleTest.java | 62 +++++++++++++++++++ 4 files changed, 71 insertions(+), 12 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 18165a272e21..45e12ed27569 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1316,6 +1316,7 @@ Only rows where `page` = `Druid` will be returned. The SQL input source is used to read data directly from RDBMS. The SQL input source is _splittable_ and can be used by the [Parallel task](#parallel-task), where each worker task will read from one SQL query from the list of queries. Since this input source has a fixed input format for reading events, no `inputFormat` field needs to be specified in the ingestion spec when using this input source. +Please refer to the Recommended practices section below before using this input source. |property|description|required?| |--------|-----------|---------| @@ -1348,6 +1349,7 @@ An example SqlInputSource spec is shown below: The spec above will read all events from two separate SQLs for the interval `2013-01-01/2013-01-02`. Each of the SQL queries will be run in its own sub-task and thus for the above example, there would be two sub-tasks. +** Recommended practices ** Compared to the other native batch InputSources, SQL InputSource behaves differently in terms of reading the input data and so it would be helpful to consider the following points before using this InputSource in a production environment: * During indexing, each sub-task would execute one of the SQL queries and the results are stored locally on disk. The sub-tasks then proceed to read the data from these local input files and generate segments. Presently, there isn’t any restriction on the size of the generated files and this would require the MiddleManagers or Indexers to have sufficient disk capacity based on the volume of data being indexed. diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index eb9e9916cac8..2b615df65ad0 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -174,7 +174,7 @@ public void insert( } @VisibleForTesting - protected static boolean isStatementException(Throwable e) + public static boolean isStatementException(Throwable e) { return e instanceof StatementException || (e instanceof CallbackFailedException && e.getCause() instanceof StatementException); diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java index d45a470a4369..724077a4c0d2 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java @@ -26,10 +26,9 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.apache.druid.metadata.SQLMetadataStorageActionHandler; import org.skife.jdbi.v2.ResultIterator; -import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.exceptions.ResultSetException; -import org.skife.jdbi.v2.exceptions.StatementException; import javax.annotation.Nullable; import java.io.File; @@ -159,12 +158,8 @@ public static CleanableFile openCleanableFile( jg.close(); return null; }, - (exception) -> { - final boolean isStatementException = exception instanceof StatementException || - (exception instanceof CallbackFailedException - && exception.getCause() instanceof StatementException); - return sqlFirehoseDatabaseConnector.isTransientException(exception) && !(isStatementException); - } + (exception) -> sqlFirehoseDatabaseConnector.isTransientException(exception) + && !(SQLMetadataStorageActionHandler.isStatementException(exception)) ); return new CleanableFile() { @@ -198,19 +193,19 @@ private static class CaseFoldedMap extends HashMap @Override public Object get(Object obj) { - return super.get(StringUtils.toLowerCase((String) obj)); + return super.get(obj == null ? null : StringUtils.toLowerCase((String) obj)); } @Override public Object put(String key, Object value) { - return super.put(StringUtils.toLowerCase(key), value); + return super.put(key == null ? null : StringUtils.toLowerCase(key), value); } @Override public boolean containsKey(Object obj) { - return super.containsKey(StringUtils.toLowerCase((String) obj)); + return super.containsKey(obj == null ? null : StringUtils.toLowerCase((String) obj)); } } } diff --git a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java new file mode 100644 index 000000000000..67126b0c7b2f --- /dev/null +++ b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.cfg.MapperConfig; +import com.fasterxml.jackson.databind.introspect.AnnotatedClass; +import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.stream.Collectors; + +public class InputSourceModuleTest +{ + private final ObjectMapper mapper = new ObjectMapper(); + private final String SQL_NAMED_TYPE = "sql"; + + @Before + public void setUp() + { + for (Module jacksonModule : new InputSourceModule().getJacksonModules()) { + mapper.registerModule(jacksonModule); + } + } + + @Test + public void testSubTypeRegistration() + { + MapperConfig config = mapper.getDeserializationConfig(); + AnnotatedClass annotatedClass = AnnotatedClassResolver.resolveWithoutSuperTypes(config, SqlInputSource.class); + List subtypes = mapper.getSubtypeResolver() + .collectAndResolveSubtypesByClass(config, annotatedClass) + .stream() + .map(NamedType::getName) + .collect(Collectors.toList()); + Assert.assertNotNull(subtypes); + Assert.assertEquals(SQL_NAMED_TYPE, Iterables.getOnlyElement(subtypes)); + } +} From fe731f78c8ffb97a9d072e33da392cf4a783cc2c Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Thu, 4 Jun 2020 13:09:23 -0500 Subject: [PATCH 10/11] Fix md in docs --- docs/ingestion/native-batch.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 45e12ed27569..981f9f700938 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1349,7 +1349,8 @@ An example SqlInputSource spec is shown below: The spec above will read all events from two separate SQLs for the interval `2013-01-01/2013-01-02`. Each of the SQL queries will be run in its own sub-task and thus for the above example, there would be two sub-tasks. -** Recommended practices ** +**Recommended practices** + Compared to the other native batch InputSources, SQL InputSource behaves differently in terms of reading the input data and so it would be helpful to consider the following points before using this InputSource in a production environment: * During indexing, each sub-task would execute one of the SQL queries and the results are stored locally on disk. The sub-tasks then proceed to read the data from these local input files and generate segments. Presently, there isn’t any restriction on the size of the generated files and this would require the MiddleManagers or Indexers to have sufficient disk capacity based on the volume of data being indexed. From 9550f53ef3616ad7cb996a60782b462a706e3e07 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Fri, 5 Jun 2020 09:07:46 -0500 Subject: [PATCH 11/11] Remove annotation --- .../apache/druid/metadata/SQLMetadataStorageActionHandler.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 2b615df65ad0..e6fdcfb45f1a 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Maps; @@ -173,7 +172,6 @@ public void insert( } } - @VisibleForTesting public static boolean isStatementException(Throwable e) { return e instanceof StatementException ||