diff --git a/core/src/main/java/org/apache/druid/data/input/ColumnsFilter.java b/core/src/main/java/org/apache/druid/data/input/ColumnsFilter.java
new file mode 100644
index 000000000000..b01001f8eec1
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/data/input/ColumnsFilter.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Used by some {@link InputSourceReader} implementations in order to know what columns will need to be read out
+ * of the {@link InputRow} objects they create.
+ *
+ * This is meant to be useful as an optimization: if we're reading from a columnar data format, then when a column
+ * isn't going to be needed, we shouldn't read it.
+ *
+ * @see InputSource#reader accepts objects of this class
+ */
+public abstract class ColumnsFilter
+{
+ /**
+ * Accepts all columns.
+ */
+ public static ColumnsFilter all()
+ {
+ return new ExclusionBased(Collections.emptySet());
+ }
+
+ /**
+ * Accepts a specific list of columns.
+ */
+ public static ColumnsFilter inclusionBased(final Set inclusions)
+ {
+ return new InclusionBased(inclusions);
+ }
+
+
+ /**
+ * Accepts all columns, except those on a specific list.
+ */
+ public static ColumnsFilter exclusionBased(final Set exclusions)
+ {
+ return new ExclusionBased(exclusions);
+ }
+
+ /**
+ * Check if a column should be included or not.
+ */
+ public abstract boolean apply(String column);
+
+ /**
+ * Returns a new filter with a particular column added. The returned filter will return true from {@link #apply}
+ * on this column.
+ */
+ public abstract ColumnsFilter plus(String column);
+
+ public static class InclusionBased extends ColumnsFilter
+ {
+ private final Set inclusions;
+
+ private InclusionBased(Set inclusions)
+ {
+ this.inclusions = inclusions;
+ }
+
+ @Override
+ public boolean apply(String column)
+ {
+ return inclusions.contains(column);
+ }
+
+ @Override
+ public ColumnsFilter plus(String column)
+ {
+ if (inclusions.contains(column)) {
+ return this;
+ } else {
+ final Set copy = new HashSet<>(inclusions);
+ copy.add(column);
+ return new InclusionBased(copy);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InclusionBased that = (InclusionBased) o;
+ return Objects.equals(inclusions, that.inclusions);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(inclusions);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ColumnsFilter.InclusionBased{" +
+ "inclusions=" + inclusions +
+ '}';
+ }
+ }
+
+ public static class ExclusionBased extends ColumnsFilter
+ {
+ private final Set exclusions;
+
+ public ExclusionBased(Set exclusions)
+ {
+ this.exclusions = exclusions;
+ }
+
+ @Override
+ public boolean apply(String column)
+ {
+ return !exclusions.contains(column);
+ }
+
+ @Override
+ public ColumnsFilter plus(String column)
+ {
+ if (!exclusions.contains(column)) {
+ return this;
+ } else {
+ final Set copy = new HashSet<>(exclusions);
+ copy.remove(column);
+ return new ExclusionBased(copy);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExclusionBased that = (ExclusionBased) o;
+ return Objects.equals(exclusions, that.exclusions);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(exclusions);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ColumnsFilter.ExclusionBased{" +
+ "exclusions=" + exclusions +
+ '}';
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
index c908187962e7..227bd3a6d198 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
@@ -22,8 +22,6 @@
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
-import java.util.List;
-
/**
* Schema of {@link InputRow}.
*/
@@ -31,13 +29,17 @@ public class InputRowSchema
{
private final TimestampSpec timestampSpec;
private final DimensionsSpec dimensionsSpec;
- private final List metricNames;
+ private final ColumnsFilter columnsFilter;
- public InputRowSchema(TimestampSpec timestampSpec, DimensionsSpec dimensionsSpec, List metricNames)
+ public InputRowSchema(
+ final TimestampSpec timestampSpec,
+ final DimensionsSpec dimensionsSpec,
+ final ColumnsFilter columnsFilter
+ )
{
this.timestampSpec = timestampSpec;
this.dimensionsSpec = dimensionsSpec;
- this.metricNames = metricNames;
+ this.columnsFilter = columnsFilter;
}
public TimestampSpec getTimestampSpec()
@@ -50,8 +52,17 @@ public DimensionsSpec getDimensionsSpec()
return dimensionsSpec;
}
- public List getMetricNames()
+ /**
+ * A {@link ColumnsFilter} that can filter down the list of columns that must be read after flattening.
+ *
+ * Logically, Druid applies ingestion spec components in a particular order: first flattenSpec (if any), then
+ * timestampSpec, then transformSpec, and finally dimensionsSpec and metricsSpec.
+ *
+ * If a flattenSpec is provided, this method returns a filter that should be applied after flattening. So, it will
+ * be based on what needs to pass between the flattenSpec and everything beyond it.
+ */
+ public ColumnsFilter getColumnsFilter()
{
- return metricNames;
+ return columnsFilter;
}
}
diff --git a/core/src/main/java/org/apache/druid/data/input/InputSource.java b/core/src/main/java/org/apache/druid/data/input/InputSource.java
index ba0224e8b0b6..0a3cda250f43 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputSource.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputSource.java
@@ -78,5 +78,9 @@ public interface InputSource
* @param inputFormat to parse data. It can be null if {@link #needsFormat()} = true
* @param temporaryDirectory to store temp data. It will be cleaned up automatically once the task is finished.
*/
- InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory);
+ InputSourceReader reader(
+ InputRowSchema inputRowSchema,
+ @Nullable InputFormat inputFormat,
+ File temporaryDirectory
+ );
}
diff --git a/core/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java b/core/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java
index 59ab8a55710c..e9117f26911a 100644
--- a/core/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java
+++ b/core/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java
@@ -25,8 +25,10 @@
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/**
+ *
*/
@PublicApi
public class MapBasedInputRow extends MapBasedRow implements InputRow
@@ -59,6 +61,28 @@ public List getDimensions()
return dimensions;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ MapBasedInputRow that = (MapBasedInputRow) o;
+ return Objects.equals(dimensions, that.dimensions);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), dimensions);
+ }
+
@Override
public String toString()
{
diff --git a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
index bd8cbe1fe755..3def28293272 100644
--- a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
@@ -37,7 +37,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
@@ -71,7 +70,7 @@ public void testUnimplementedInputFormat() throws IOException
new InputRowSchema(
inputRowParser.getParseSpec().getTimestampSpec(),
inputRowParser.getParseSpec().getDimensionsSpec(),
- Collections.emptyList()
+ ColumnsFilter.all()
),
null,
null
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/ColumnsFilterTest.java b/core/src/test/java/org/apache/druid/data/input/impl/ColumnsFilterTest.java
new file mode 100644
index 000000000000..00faf4ea5324
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/data/input/impl/ColumnsFilterTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ColumnsFilterTest
+{
+ private static final List COLUMNS = ImmutableList.of("a", "b", "c");
+
+ @Test
+ public void testAll()
+ {
+ Assert.assertEquals(
+ ImmutableList.of("a", "b", "c"),
+ apply(ColumnsFilter.all(), COLUMNS)
+ );
+ }
+
+ @Test
+ public void testInclusionBased()
+ {
+ Assert.assertEquals(
+ ImmutableList.of("b"),
+ apply(ColumnsFilter.inclusionBased(ImmutableSet.of("b")), COLUMNS)
+ );
+ }
+
+ @Test
+ public void testInclusionBasedPlus()
+ {
+ Assert.assertEquals(
+ ColumnsFilter.inclusionBased(ImmutableSet.of("a", "b", "c")),
+ ColumnsFilter.inclusionBased(ImmutableSet.of("b", "c")).plus("a").plus("c")
+ );
+ }
+
+ @Test
+ public void testExclusionBased()
+ {
+ Assert.assertEquals(
+ ImmutableList.of("a", "c"),
+ apply(ColumnsFilter.exclusionBased(ImmutableSet.of("b")), COLUMNS)
+ );
+ }
+
+ @Test
+ public void testExclusionBasedPlus()
+ {
+ Assert.assertEquals(
+ ColumnsFilter.exclusionBased(ImmutableSet.of("b")),
+ ColumnsFilter.exclusionBased(ImmutableSet.of("b", "c")).plus("a").plus("c")
+ );
+ }
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(ColumnsFilter.InclusionBased.class).usingGetClass().verify();
+ EqualsVerifier.forClass(ColumnsFilter.ExclusionBased.class).usingGetClass().verify();
+ }
+
+ private List apply(ColumnsFilter columnsFilter, List columns)
+ {
+ return columns.stream().filter(columnsFilter::apply).collect(Collectors.toList());
+ }
+}
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java
index ec942379f3b2..c1faa274845c 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java
@@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@@ -37,7 +38,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@@ -47,7 +47,7 @@ public class CsvReaderTest
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))),
- Collections.emptyList()
+ ColumnsFilter.all()
);
@BeforeClass
@@ -229,7 +229,7 @@ public void testQuotes() throws IOException
new InputRowSchema(
new TimestampSpec("Timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("Timestamp"))),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java
index e590ed566a93..c98d8fff6a85 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java
@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@@ -35,7 +36,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -44,7 +44,7 @@ public class DelimitedReaderTest
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))),
- Collections.emptyList()
+ ColumnsFilter.all()
);
@BeforeClass
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
index e202d152047a..37b35f149829 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.java.util.common.DateTimes;
@@ -37,7 +38,6 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
public class InputEntityIteratingReaderTest
@@ -64,7 +64,7 @@ public void test() throws IOException
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "name", "score"))
),
- Collections.emptyList()
+ ColumnsFilter.all()
),
new CsvInputFormat(
ImmutableList.of("time", "name", "score"),
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java
index 3a400b5456b5..c61ad4921bb7 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@@ -66,7 +67,7 @@ public void testParseRow() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
@@ -116,7 +117,7 @@ public void testParseRowWithConditional() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
@@ -158,7 +159,7 @@ public void testParseRowKeepNullColumns() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
@@ -200,7 +201,7 @@ public void testKeepNullColumnsWithNoNullValues() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
@@ -242,7 +243,7 @@ public void testFalseKeepNullColumns() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java
index f554034469f0..7ab52a095d51 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -38,7 +39,6 @@
import org.junit.rules.ExpectedException;
import java.io.IOException;
-import java.util.Collections;
public class JsonReaderTest
{
@@ -75,7 +75,7 @@ public void testParseMultipleRows() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
@@ -141,7 +141,7 @@ public void testParsePrettyFormatJSON() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
@@ -194,7 +194,8 @@ public void testInvalidJSONText() throws IOException
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
- + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}" //baz property is illegal
+ + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}"
+ //baz property is illegal
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}")
);
@@ -202,7 +203,7 @@ public void testInvalidJSONText() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
@@ -255,7 +256,7 @@ public void testSampleMultipleRows() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
@@ -314,7 +315,8 @@ public void testSamplInvalidJSONText() throws IOException
//2nd row is ill-formed
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
- + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}\n" //value of baz is invalid
+ + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}\n"
+ //value of baz is invalid
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n")
);
@@ -322,7 +324,7 @@ public void testSamplInvalidJSONText() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
@@ -377,7 +379,7 @@ public void testEmptyJSONText() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
@@ -431,7 +433,7 @@ public void testSampleEmptyText() throws IOException
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
- Collections.emptyList()
+ ColumnsFilter.all()
),
source,
null
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 18c23da6be0c..5a91a772b861 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1286,6 +1286,7 @@ Additional peon configs include:
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|
|`druid.indexer.task.restoreTasksOnRestart`|If true, MiddleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
+|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch.md#druid-input-source) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
|`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests served by a task's chat handler. Set to 0 to disable limiting.|0|
If the peon is running in remote mode, there must be an Overlord up and running. Peons in remote mode can set the following configurations:
@@ -1350,6 +1351,7 @@ then the value from the configuration below is used:
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on Indexer restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|
|`druid.indexer.task.restoreTasksOnRestart`|If true, the Indexer will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
+|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch.md#druid-input-source) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
|`druid.peon.taskActionClient.retry.minWait`|The minimum retry time to communicate with Overlord.|PT5S|
|`druid.peon.taskActionClient.retry.maxWait`|The maximum retry time to communicate with Overlord.|PT1M|
|`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of retries to communicate with Overlord.|60|
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 5032ed1a518c..dece5bf260c4 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -1292,60 +1292,82 @@ no `inputFormat` field needs to be specified in the ingestion spec when using th
|type|This should be "druid".|yes|
|dataSource|A String defining the Druid datasource to fetch rows from|yes|
|interval|A String representing an ISO-8601 interval, which defines the time range to fetch the data over.|yes|
-|dimensions|A list of Strings containing the names of dimension columns to select from the Druid datasource. If the list is empty, no dimensions are returned. If null, all dimensions are returned. |no|
-|metrics|The list of Strings containing the names of metric columns to select. If the list is empty, no metrics are returned. If null, all metrics are returned.|no|
|filter| See [Filters](../querying/filters.md). Only rows that match the filter, if specified, will be returned.|no|
-A minimal example DruidInputSource spec is shown below:
+The Druid input source can be used for a variety of purposes, including:
-```json
-...
- "ioConfig": {
- "type": "index_parallel",
- "inputSource": {
- "type": "druid",
- "dataSource": "wikipedia",
- "interval": "2013-01-01/2013-01-02"
- }
- ...
- },
-...
-```
+- Creating new datasources that are rolled-up copies of existing datasources.
+- Changing the [partitioning or sorting](index.md#partitioning) of a datasource to improve performance.
+- Updating or removing rows using a [`transformSpec`](index.md#transformspec).
-The spec above will read all existing dimension and metric columns from
-the `wikipedia` datasource, including all rows with a timestamp (the `__time` column)
-within the interval `2013-01-01/2013-01-02`.
+When using the Druid input source, the timestamp column shows up as a numeric field named `__time` set to the number
+of milliseconds since the epoch (January 1, 1970 00:00:00 UTC). It is common to use this in the timestampSpec, if you
+want the output timestamp to be equivalent to the input timestamp. In this case, set the timestamp column to `__time`
+and the format to `auto` or `millis`.
-A spec that applies a filter and reads a subset of the original datasource's columns is shown below.
+It is OK for the input and output datasources to be the same. In this case, newly generated data will overwrite the
+previous data for the intervals specified in the `granularitySpec`. Generally, if you are going to do this, it is a
+good idea to test out your reindexing by writing to a separate datasource before overwriting your main one.
+Alternatively, if your goals can be satisfied by [compaction](compaction.md), consider that instead as a simpler
+approach.
+
+An example task spec is shown below. It reads from a hypothetical raw datasource `wikipedia_raw` and creates a new
+rolled-up datasource `wikipedia_rollup` by grouping on hour, "countryName", and "page".
```json
-...
+{
+ "type": "index_parallel",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "wikipedia_rollup",
+ "timestampSpec": {
+ "column": "__time",
+ "format": "millis"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "countryName",
+ "page"
+ ]
+ },
+ "metricsSpec": [
+ {
+ "type": "count",
+ "name": "cnt"
+ }
+ ],
+ "granularitySpec": {
+ "type": "uniform",
+ "queryGranularity": "HOUR",
+ "segmentGranularity": "DAY",
+ "intervals": ["2016-06-27/P1D"],
+ "rollup": true
+ }
+ },
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "druid",
- "dataSource": "wikipedia",
- "interval": "2013-01-01/2013-01-02",
- "dimensions": [
- "page",
- "user"
- ],
- "metrics": [
- "added"
- ],
- "filter": {
- "type": "selector",
- "dimension": "page",
- "value": "Druid"
- }
+ "dataSource": "wikipedia_raw",
+ "interval": "2016-06-27/P1D"
}
- ...
},
-...
+ "tuningConfig": {
+ "type": "index_parallel",
+ "partitionsSpec": {
+ "type": "hashed"
+ },
+ "forceGuaranteedRollup": true,
+ "maxNumConcurrentSubTasks": 1
+ }
+ }
+}
```
-This spec above will only return the `page`, `user` dimensions and `added` metric.
-Only rows where `page` = `Druid` will be returned.
+> Note: Older versions (0.19 and earlier) did not respect the timestampSpec when using the Druid input source. If you
+> have ingestion specs that rely on this and cannot rewrite them, set
+> [`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`](../configuration/index.md#indexer-general-configuration)
+> to `true` to enable a compatibility mode where the timestampSpec is ignored.
### SQL Input Source
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
index d38bc6627d98..b7e95402e234 100644
--- a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
@@ -39,6 +39,7 @@
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
@@ -111,7 +112,8 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
private static final OssClientConfig CLOUD_CONFIG_PROPERTIES = new OssClientConfig(
"test.oss-cn.aliyun.com",
new DefaultPasswordProvider("myKey"),
- new DefaultPasswordProvider("mySecret"));
+ new DefaultPasswordProvider("mySecret")
+ );
private static final List EXPECTED_LOCATION =
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
@@ -454,7 +456,7 @@ public void testReader() throws IOException
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
- ImmutableList.of("count")
+ ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(
@@ -497,7 +499,7 @@ public void testCompressedReader() throws IOException
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
- ImmutableList.of("count")
+ ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
index da586d8ed635..4841483c4e1a 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
@@ -27,6 +27,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.druid.data.input.AvroHadoopInputRowParserTest;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -199,10 +200,9 @@ private InputEntityReader createReader(
final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
final DimensionsSpec dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
"eventType")));
- final List metricNames = ImmutableList.of("someLong");
final AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(mapper, null, readerSchema, null);
- final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, metricNames);
+ final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, ColumnsFilter.all());
final FileEntity entity = new FileEntity(someAvroFile);
return inputFormat.createReader(schema, entity, temporaryFolder.newFolder());
}
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
index e68a9a44a386..4b42efe0d61c 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
@@ -31,6 +31,7 @@
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
@@ -226,7 +227,7 @@ public void testReader() throws IOException
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
- ImmutableList.of("count")
+ ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(
@@ -269,7 +270,7 @@ public void testCompressedReader() throws IOException
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
- ImmutableList.of("count")
+ ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(
diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
index a61a0c6de950..96a99ad6fdfe 100644
--- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
@@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@@ -76,7 +77,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec(null, null, null),
DimensionsSpec.EMPTY,
- Collections.emptyList()
+ ColumnsFilter.all()
);
private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
Arrays.asList(TimestampSpec.DEFAULT_COLUMN, COLUMN),
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 992bb19b6265..3d9dc1c4242a 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2781,7 +2781,8 @@ private void makeToolboxFactory() throws IOException
true,
null,
null,
- null
+ null,
+ false
);
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createDataSourceTable();
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 302a4a089dac..2fe4d333d495 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2867,7 +2867,8 @@ private void makeToolboxFactory() throws IOException
true,
null,
null,
- null
+ null,
+ false
);
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createDataSourceTable();
diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
index bef9b64728f4..9726c0e14671 100644
--- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
+++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
@@ -259,7 +260,7 @@ private InputEntityReader createReader(
String dataFile
) throws IOException
{
- final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, Collections.emptyList());
+ final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, ColumnsFilter.all());
final FileEntity entity = new FileEntity(new File(dataFile));
return inputFormat.createReader(schema, entity, temporaryFolder.newFolder());
}
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetReaderTest.java
index f8b586bec67d..60173212b53c 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetReaderTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetReaderTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -34,7 +35,6 @@
import org.junit.Test;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
/**
@@ -49,7 +49,7 @@ public void testBinaryAsString() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("field"))),
- ImmutableList.of()
+ ColumnsFilter.all()
);
InputEntityReader reader = createReader(
file,
@@ -114,7 +114,7 @@ public void testParquet1217() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
- ImmutableList.of("metric1")
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "col", "col"),
@@ -200,7 +200,7 @@ required group nestedIntsColumn (LIST) {
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
- Collections.emptyList()
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractByLogicalMap", "$.intToStringColumn.1"),
@@ -315,7 +315,7 @@ public void testOldRepeatedInt() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("repeatedInt"))),
- Collections.emptyList()
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "repeatedInt", "repeatedInt")
@@ -353,7 +353,7 @@ public void testReadNestedArrayStruct() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i32_dec", "extracted1", "extracted2"))),
- Collections.emptyList()
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extracted1", "$.myComplex[0].id"),
@@ -395,7 +395,7 @@ public void testProtoStructWithArray() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
- Collections.emptyList()
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractedOptional", "$.optionalMessage.someId"),
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetReaderTest.java
index 50b9fe2df27d..faa80e6d73f3 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetReaderTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetReaderTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -49,7 +50,7 @@ public void testReadParquetDecimalFixedLen() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("fixed_len_dec"))),
- ImmutableList.of("metric1")
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "fixed_len_dec", "fixed_len_dec"),
@@ -86,7 +87,7 @@ public void testReadParquetDecimali32() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i32_dec"))),
- ImmutableList.of("metric1")
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "i32_dec", "i32_dec"),
@@ -123,7 +124,7 @@ public void testReadParquetDecimali64() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i64_dec"))),
- ImmutableList.of("metric1")
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "i32_dec", "i64_dec"),
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java
index 5be38dda494d..7ff430667fa5 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -33,7 +34,6 @@
import org.junit.Test;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
/**
@@ -69,7 +69,7 @@ public void testFlat1NoFlattenSpec() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "listDim"))),
- ImmutableList.of("metric1", "metric2")
+ ColumnsFilter.all()
);
JSONPathSpec flattenSpec = new JSONPathSpec(false, ImmutableList.of());
InputEntityReader reader = createReader(
@@ -103,7 +103,7 @@ public void testFlat1Autodiscover() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
- ImmutableList.of("metric1", "metric2")
+ ColumnsFilter.all()
);
InputEntityReader reader = createReader(
file,
@@ -136,7 +136,7 @@ public void testFlat1Flatten() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "list"))),
- ImmutableList.of("metric1", "metric2")
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
@@ -177,7 +177,7 @@ public void testFlat1FlattenSelectListItem() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "listExtracted"))),
- ImmutableList.of("metric1", "metric2")
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
@@ -217,7 +217,7 @@ public void testNested1NoFlattenSpec() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1"))),
- ImmutableList.of("metric1")
+ ColumnsFilter.all()
);
JSONPathSpec flattenSpec = new JSONPathSpec(false, ImmutableList.of());
InputEntityReader reader = createReader(
@@ -253,7 +253,7 @@ public void testNested1Autodiscover() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
- ImmutableList.of("metric1", "metric2")
+ ColumnsFilter.all()
);
InputEntityReader reader = createReader(
file,
@@ -286,7 +286,7 @@ public void testNested1Flatten() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
- ImmutableList.of("metric1", "metric2")
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
@@ -329,7 +329,7 @@ public void testNested1FlattenSelectListItem() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
- Collections.emptyList()
+ ColumnsFilter.all()
);
List flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
index 251fa344bb73..f8e56b3f2deb 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@@ -39,7 +40,6 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Collections;
import java.util.Objects;
public class ParquetReaderResourceLeakTest extends BaseParquetReaderTest
@@ -55,7 +55,7 @@ public void testFetchOnReadCleanupAfterExhaustingIterator() throws IOException
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("page", "language", "user", "unpatrolled"))
),
- Collections.emptyList()
+ ColumnsFilter.all()
);
FetchingFileEntity entity = new FetchingFileEntity(new File("example/wiki/wiki.parquet"));
ParquetInputFormat parquet = new ParquetInputFormat(JSONPathSpec.DEFAULT, false, new Configuration());
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetReaderTest.java
index 19f1544dcff0..c0189fe8bc19 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetReaderTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetReaderTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -31,7 +32,6 @@
import org.junit.Test;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
/**
@@ -46,12 +46,12 @@ public void testDateHandling() throws IOException
InputRowSchema schemaAsString = new InputRowSchema(
new TimestampSpec("date_as_string", "Y-M-d", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
- Collections.emptyList()
+ ColumnsFilter.all()
);
InputRowSchema schemaAsDate = new InputRowSchema(
new TimestampSpec("date_as_date", null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
- Collections.emptyList()
+ ColumnsFilter.all()
);
InputEntityReader readerAsString = createReader(
file,
@@ -104,7 +104,7 @@ public void testParseInt96Timestamp() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
- Collections.emptyList()
+ ColumnsFilter.all()
);
InputEntityReader reader = createReader(file, schema, JSONPathSpec.DEFAULT);
@@ -130,7 +130,7 @@ public void testTimeMillisInInt64() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
- Collections.emptyList()
+ ColumnsFilter.all()
);
InputEntityReader reader = createReader(
file,
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetReaderTest.java
index 75e5e916ec78..4bc7bac27b2e 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetReaderTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetReaderTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -31,7 +32,6 @@
import org.junit.Test;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
/**
@@ -45,7 +45,7 @@ public void testWiki() throws IOException
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("page", "language", "user", "unpatrolled"))),
- Collections.emptyList()
+ ColumnsFilter.all()
);
InputEntityReader reader = createReader("example/wiki/wiki.parquet", schema, JSONPathSpec.DEFAULT);
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index 9efaf4fd09c3..41a5f8a36c8b 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -40,6 +40,7 @@
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
@@ -509,7 +510,7 @@ public void testReader() throws IOException
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
- ImmutableList.of("count")
+ ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(
@@ -553,7 +554,7 @@ public void testCompressedReader() throws IOException
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
- ImmutableList.of("count")
+ ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java
index bd9d21457eb6..8714fa6933ac 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java
@@ -34,6 +34,10 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+/**
+ * @deprecated only used by {@link org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory}
+ */
+@Deprecated
public class ReingestionTimelineUtils
{
/**
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index 7c22dad5b62b..bf887e500e6e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -67,6 +67,9 @@ public class TaskConfig
@JsonProperty
private final List shuffleDataLocations;
+ @JsonProperty
+ private final boolean ignoreTimestampSpecForDruidInputSource;
+
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@@ -77,7 +80,8 @@ public TaskConfig(
@JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart,
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
- @JsonProperty("shuffleDataLocations") List shuffleDataLocations
+ @JsonProperty("shuffleDataLocations") List shuffleDataLocations,
+ @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@@ -102,6 +106,7 @@ public TaskConfig(
} else {
this.shuffleDataLocations = shuffleDataLocations;
}
+ this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
}
@JsonProperty
@@ -178,6 +183,12 @@ public List getShuffleDataLocations()
return shuffleDataLocations;
}
+ @JsonProperty
+ public boolean isIgnoreTimestampSpecForDruidInputSource()
+ {
+ return ignoreTimestampSpecForDruidInputSource;
+ }
+
private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 6f7cdf68cfe6..72f150226644 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -26,7 +26,6 @@
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.indexer.TaskStatus;
@@ -42,6 +41,7 @@
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
+import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
@@ -49,7 +49,6 @@
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
@@ -66,7 +65,6 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -176,16 +174,9 @@ public static FilteringCloseableInputRowIterator inputSourceReader(
ParseExceptionHandler parseExceptionHandler
) throws IOException
{
- final List metricsNames = Arrays.stream(dataSchema.getAggregators())
- .map(AggregatorFactory::getName)
- .collect(Collectors.toList());
final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
inputSource.reader(
- new InputRowSchema(
- dataSchema.getTimestampSpec(),
- dataSchema.getDimensionsSpec(),
- metricsNames
- ),
+ InputRowSchemas.fromDataSchema(dataSchema),
inputFormat,
tmpDir
)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 8234c351e6ce..6f69811f3e58 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -624,12 +624,13 @@ private static ParallelIndexIOConfig createIoConfig(
interval,
null,
null,
- dataSchema.getDimensionsSpec().getDimensionNames(),
- Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
+ null,
+ null,
toolbox.getIndexIO(),
coordinatorClient,
segmentLoaderFactory,
- retryPolicyFactory
+ retryPolicyFactory,
+ toolbox.getConfig()
),
null,
false
@@ -699,7 +700,7 @@ private static DataSchema createDataSchema(
return new
DataSchema(
dataSource,
- new TimestampSpec(null, null, null),
+ new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
finalDimensionsSpec,
finalMetricsSpec,
uniformGranularitySpec,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
index 05ac79e733d6..63ebba8daf7c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
@@ -76,6 +76,7 @@ public static SegmentsAndCommitMetadata process(
? (DynamicPartitionsSpec) partitionsSpec
: null;
final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+
try (
final CloseableIterator inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
tmpDir,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 3ba936ebbdaa..01b3d95672e0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -62,6 +62,10 @@
import java.util.Map;
import java.util.stream.Stream;
+/**
+ * @deprecated use {@link DruidInputSource} instead
+ */
+@Deprecated
public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory>
{
private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index f10e40b17f46..c9d0f4e464b2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -22,10 +22,10 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.AbstractInputSource;
@@ -39,10 +39,11 @@
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.SplittableInputSource;
-import org.apache.druid.indexing.common.ReingestionTimelineUtils;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.RetryPolicy;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@@ -50,6 +51,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
@@ -70,15 +72,27 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;
+/**
+ * An {@link org.apache.druid.data.input.InputSource} that allows reading from Druid segments.
+ *
+ * Used internally by {@link org.apache.druid.indexing.common.task.CompactionTask}, and can also be used directly.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
public class DruidInputSource extends AbstractInputSource implements SplittableInputSource>
{
private static final Logger LOG = new Logger(DruidInputSource.class);
+ /**
+ * Timestamp formats that the standard __time column can be parsed with.
+ */
+ private static final List STANDARD_TIME_COLUMN_FORMATS = ImmutableList.of("millis", "auto");
+
/**
* A Comparator that orders {@link WindowedSegmentId} mainly by segmentId (which is important), and then by intervals
* (which is arbitrary, and only here for totality of ordering).
@@ -113,12 +127,21 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
@Nullable
private final List segmentIds;
private final DimFilter dimFilter;
- private final List dimensions;
- private final List metrics;
private final IndexIO indexIO;
private final CoordinatorClient coordinatorClient;
private final SegmentLoaderFactory segmentLoaderFactory;
private final RetryPolicyFactory retryPolicyFactory;
+ private final TaskConfig taskConfig;
+
+ /**
+ * Included for serde backwards-compatibility only. Not used.
+ */
+ private final List dimensions;
+
+ /**
+ * Included for serde backwards-compatibility only. Not used.
+ */
+ private final List metrics;
@JsonCreator
public DruidInputSource(
@@ -133,7 +156,8 @@ public DruidInputSource(
@JacksonInject IndexIO indexIO,
@JacksonInject CoordinatorClient coordinatorClient,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
- @JacksonInject RetryPolicyFactory retryPolicyFactory
+ @JacksonInject RetryPolicyFactory retryPolicyFactory,
+ @JacksonInject TaskConfig taskConfig
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
@@ -150,6 +174,7 @@ public DruidInputSource(
this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory");
+ this.taskConfig = Preconditions.checkNotNull(taskConfig, "null taskConfig");
}
@JsonProperty
@@ -167,7 +192,6 @@ public Interval getInterval()
@Nullable
@JsonProperty("segments")
- @JsonInclude(Include.NON_NULL)
public List getSegmentIds()
{
return segmentIds;
@@ -179,12 +203,18 @@ public DimFilter getDimFilter()
return dimFilter;
}
+ /**
+ * Included for serde backwards-compatibility only. Not used.
+ */
@JsonProperty
public List getDimensions()
{
return dimensions;
}
+ /**
+ * Included for serde backwards-compatibility only. Not used.
+ */
@JsonProperty
public List getMetrics()
{
@@ -207,28 +237,38 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu
.from(partitionHolder)
.transform(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval()));
}).iterator();
- final List effectiveDimensions = ReingestionTimelineUtils.getDimensionsToReingest(
- dimensions,
- inputRowSchema.getDimensionsSpec(),
- timeline
- );
- List effectiveMetrics;
- if (metrics == null) {
- effectiveMetrics = ReingestionTimelineUtils.getUniqueMetrics(timeline);
+ final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter);
+
+ final InputRowSchema inputRowSchemaToUse;
+
+ if (taskConfig.isIgnoreTimestampSpecForDruidInputSource()) {
+ // Legacy compatibility mode; see https://github.com/apache/druid/pull/10267.
+ LOG.warn("Ignoring the provided timestampSpec and reading the __time column instead. To use timestampSpecs with "
+ + "the 'druid' input source, set druid.indexer.task.ignoreTimestampSpecForDruidInputSource to false.");
+
+ inputRowSchemaToUse = new InputRowSchema(
+ new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, STANDARD_TIME_COLUMN_FORMATS.iterator().next(), null),
+ inputRowSchema.getDimensionsSpec(),
+ inputRowSchema.getColumnsFilter().plus(ColumnHolder.TIME_COLUMN_NAME)
+ );
} else {
- effectiveMetrics = metrics;
+ inputRowSchemaToUse = inputRowSchema;
}
- final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(
- indexIO,
- dimFilter,
- effectiveDimensions,
- effectiveMetrics
- );
+ if (ColumnHolder.TIME_COLUMN_NAME.equals(inputRowSchemaToUse.getTimestampSpec().getTimestampColumn())
+ && !STANDARD_TIME_COLUMN_FORMATS.contains(inputRowSchemaToUse.getTimestampSpec().getTimestampFormat())) {
+ // Slight chance the user did this intentionally, but not likely. Log a warning.
+ LOG.warn(
+ "The provided timestampSpec refers to the %s column without using format %s. If you wanted to read the "
+ + "column as-is, switch formats.",
+ inputRowSchemaToUse.getTimestampSpec().getTimestampColumn(),
+ STANDARD_TIME_COLUMN_FORMATS
+ );
+ }
return new InputEntityIteratingReader(
- inputRowSchema,
+ inputRowSchemaToUse,
inputFormat,
entityIterator,
temporaryDirectory
@@ -300,7 +340,8 @@ public SplittableInputSource> withSplit(InputSplit>> createSplits(
CoordinatorClient coordinatorClient,
RetryPolicyFactory retryPolicyFactory,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
index 80f87721357c..4d028596ff08 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
@@ -27,26 +27,19 @@
import org.apache.druid.segment.IndexIO;
import java.io.File;
-import java.util.List;
public class DruidSegmentInputFormat implements InputFormat
{
private final IndexIO indexIO;
private final DimFilter dimFilter;
- private List dimensions;
- private List metrics;
- DruidSegmentInputFormat(
+ public DruidSegmentInputFormat(
IndexIO indexIO,
- DimFilter dimFilter,
- List dimensions,
- List metrics
+ DimFilter dimFilter
)
{
this.indexIO = indexIO;
this.dimFilter = dimFilter;
- this.dimensions = dimensions;
- this.metrics = metrics;
}
@Override
@@ -65,8 +58,9 @@ public InputEntityReader createReader(
return new DruidSegmentReader(
source,
indexIO,
- dimensions,
- metrics,
+ inputRowSchema.getTimestampSpec(),
+ inputRowSchema.getDimensionsSpec(),
+ inputRowSchema.getColumnsFilter(),
dimFilter,
temporaryDirectory
);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
index 25ada0a178f8..8e3bfe7108a2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
@@ -21,12 +21,18 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntity.CleanableFile;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
-import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -35,56 +41,64 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.druid.utils.CollectionUtils;
-import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
+import java.util.Set;
public class DruidSegmentReader extends IntermediateRowParsingReader
),
},
- {
- name: 'inputSource.dimensions',
- label: 'Dimensions',
- type: 'string-array',
- placeholder: '(optional)',
- hideInMore: true,
- info: (
-
- The list of dimensions to select. If left empty, no dimensions are returned. If left
- null or not defined, all dimensions are returned.
-
- ),
- },
- {
- name: 'inputSource.metrics',
- label: 'Metrics',
- type: 'string-array',
- placeholder: '(optional)',
- hideInMore: true,
- info: (
-
- The list of metrics to select. If left empty, no metrics are returned. If left null or
- not defined, all metrics are selected.
-
- ),
- },
{
name: 'inputSource.filter',
label: 'Filter',
diff --git a/web-console/src/druid-models/timestamp-spec.tsx b/web-console/src/druid-models/timestamp-spec.tsx
index b6c595b17dab..f6a8263998fa 100644
--- a/web-console/src/druid-models/timestamp-spec.tsx
+++ b/web-console/src/druid-models/timestamp-spec.tsx
@@ -32,11 +32,18 @@ import { Transform } from './transform-spec';
const NO_SUCH_COLUMN = '!!!_no_such_column_!!!';
+export const TIME_COLUMN = '__time';
+
export const PLACEHOLDER_TIMESTAMP_SPEC: TimestampSpec = {
column: NO_SUCH_COLUMN,
missingValue: '1970-01-01T00:00:00Z',
};
+export const REINDEX_TIMESTAMP_SPEC: TimestampSpec = {
+ column: TIME_COLUMN,
+ format: 'millis',
+};
+
export const CONSTANT_TIMESTAMP_SPEC: TimestampSpec = {
column: NO_SUCH_COLUMN,
missingValue: '2010-01-01T00:00:00Z',
@@ -48,7 +55,7 @@ export function getTimestampSchema(spec: IngestionSpec): TimestampSchema {
const transforms: Transform[] =
deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || EMPTY_ARRAY;
- const timeTransform = transforms.find(transform => transform.name === '__time');
+ const timeTransform = transforms.find(transform => transform.name === TIME_COLUMN);
if (timeTransform) return 'expression';
const timestampSpec = deepGet(spec, 'spec.dataSchema.timestampSpec') || EMPTY_OBJECT;
@@ -74,7 +81,7 @@ export function getTimestampSpecExpressionFromSpec(spec: IngestionSpec): string
const transforms: Transform[] =
deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || EMPTY_ARRAY;
- const timeTransform = transforms.find(transform => transform.name === '__time');
+ const timeTransform = transforms.find(transform => transform.name === TIME_COLUMN);
if (!timeTransform) return;
return timeTransform.expression;
}
diff --git a/web-console/src/utils/sampler.ts b/web-console/src/utils/sampler.ts
index 3f37c9edc951..0bd9ae8e36c9 100644
--- a/web-console/src/utils/sampler.ts
+++ b/web-console/src/utils/sampler.ts
@@ -29,6 +29,8 @@ import {
isDruidSource,
MetricSpec,
PLACEHOLDER_TIMESTAMP_SPEC,
+ REINDEX_TIMESTAMP_SPEC,
+ TIME_COLUMN,
TimestampSpec,
Transform,
TransformSpec,
@@ -152,13 +154,13 @@ export function headerFromSampleResponse(options: HeaderFromSampleResponseOption
let columns = sortWithPrefixSuffix(
dedupe(sampleResponse.data.flatMap(s => (s.parsed ? Object.keys(s.parsed) : []))).sort(),
- columnOrder || ['__time'],
+ columnOrder || [TIME_COLUMN],
suffixColumnOrder || [],
alphanumericCompare,
);
if (ignoreTimeColumn) {
- columns = columns.filter(c => c !== '__time');
+ columns = columns.filter(c => c !== TIME_COLUMN);
}
return columns;
@@ -290,7 +292,7 @@ export async function sampleForConnect(
ioConfig,
dataSchema: {
dataSource: 'sample',
- timestampSpec: PLACEHOLDER_TIMESTAMP_SPEC,
+ timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC,
dimensionsSpec: {},
},
} as any,
@@ -338,13 +340,15 @@ export async function sampleForParser(
sampleStrategy,
);
+ const reingestMode = isDruidSource(spec);
+
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
ioConfig,
dataSchema: {
dataSource: 'sample',
- timestampSpec: PLACEHOLDER_TIMESTAMP_SPEC,
+ timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC,
dimensionsSpec: {},
},
},
@@ -398,7 +402,7 @@ export async function sampleForTimestamp(
dimensionsSpec: {},
timestampSpec,
transformSpec: {
- transforms: transforms.filter(transform => transform.name === '__time'),
+ transforms: transforms.filter(transform => transform.name === TIME_COLUMN),
},
},
},
@@ -459,7 +463,7 @@ export async function sampleForTransform(
headerFromSampleResponse({
sampleResponse: sampleResponseHack,
ignoreTimeColumn: true,
- columnOrder: ['__time'].concat(inputFormatColumns),
+ columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
}).concat(transforms.map(t => t.name)),
);
}
@@ -518,7 +522,7 @@ export async function sampleForFilter(
headerFromSampleResponse({
sampleResponse: sampleResponseHack,
ignoreTimeColumn: true,
- columnOrder: ['__time'].concat(inputFormatColumns),
+ columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
}).concat(transforms.map(t => t.name)),
);
}
diff --git a/web-console/src/views/load-data-view/load-data-view.tsx b/web-console/src/views/load-data-view/load-data-view.tsx
index 783b44883df3..447c9f64c2ad 100644
--- a/web-console/src/views/load-data-view/load-data-view.tsx
+++ b/web-console/src/views/load-data-view/load-data-view.tsx
@@ -55,6 +55,7 @@ import {
} from '../../components';
import { FormGroupWithInfo } from '../../components/form-group-with-info/form-group-with-info';
import { AsyncActionDialog } from '../../dialogs';
+import { TIME_COLUMN } from '../../druid-models';
import {
addTimestampTransform,
adjustId,
@@ -1221,8 +1222,8 @@ export class LoadDataView extends React.PureComponent k !== '__time' && !aggregators[k])
+ .filter(k => k !== TIME_COLUMN && !aggregators[k])
.map(k => ({
name: k,
type: String(inputData.columns![k].type || 'string').toLowerCase(),
@@ -1455,7 +1456,7 @@ export class LoadDataView extends React.PureComponent