org.apache.druid
druid-services
${project.parent.version}
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
index d44f3ad6593f..6c04d2eebbf2 100644
--- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
@@ -26,6 +26,7 @@
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.sync.CatalogClient;
import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.TaskBuilder;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.StringUtils;
@@ -34,13 +35,11 @@
import org.apache.druid.server.coordinator.CatalogDataSourceCompactionConfig;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.testing.embedded.EmbeddedBroker;
-import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedHistorical;
import org.apache.druid.testing.embedded.EmbeddedIndexer;
import org.apache.druid.testing.embedded.EmbeddedOverlord;
-import org.apache.druid.testing.embedded.indexing.Resources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Period;
@@ -51,7 +50,8 @@
public class CatalogCompactionTest extends EmbeddedClusterTestBase
{
- private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord()
+ .addProperty("druid.catalog.client.maxSyncRetries", "0");
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator()
.addProperty("druid.manager.segments.useIncrementalCache", "always");
private final EmbeddedBroker broker = new EmbeddedBroker()
@@ -64,8 +64,8 @@ protected EmbeddedDruidCluster createCluster()
.useLatchableEmitter()
.addExtension(CatalogClientModule.class)
.addExtension(CatalogCoordinatorModule.class)
- .addServer(coordinator)
.addServer(overlord)
+ .addServer(coordinator)
.addServer(broker)
.addServer(new EmbeddedIndexer())
.addServer(new EmbeddedHistorical());
@@ -81,7 +81,7 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity()
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
);
- Assertions.assertEquals(10, segments.size());
+ Assertions.assertEquals(3, segments.size());
segments.forEach(
segment -> Assertions.assertTrue(Granularities.DAY.isAligned(segment.getInterval()))
);
@@ -136,21 +136,26 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity()
private void runIngestionAtDayGranularity()
{
final String taskId = IdUtils.getRandomId();
- final Object task = createIndexTaskForInlineData(
- taskId,
- StringUtils.replace(Resources.CSV_DATA_10_DAYS, "\n", "\\n")
- );
+ final Object task = createIndexTaskForInlineData(taskId);
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
cluster.callApi().waitForTaskToSucceed(taskId, overlord);
}
- private Object createIndexTaskForInlineData(String taskId, String inlineDataCsv)
+ private Object createIndexTaskForInlineData(String taskId)
{
- return EmbeddedClusterApis.createTaskFromPayload(
- taskId,
- StringUtils.format(Resources.INDEX_TASK_PAYLOAD_WITH_INLINE_DATA, inlineDataCsv, dataSource)
- );
+ final String inlineDataCsv =
+ "2025-06-01T00:00:00.000Z,shirt,105"
+ + "\n2025-06-02T00:00:00.000Z,trousers,210"
+ + "\n2025-06-03T00:00:00.000Z,jeans,150";
+ return TaskBuilder.ofTypeIndex()
+ .dataSource(dataSource)
+ .isoTimestampColumn("time")
+ .csvInputFormatWithColumns("time", "item", "value")
+ .inlineInputSourceWithData(inlineDataCsv)
+ .segmentGranularity("DAY")
+ .dimensions()
+ .withId(taskId);
}
private void enableCompactionSupervisor()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java
new file mode 100644
index 000000000000..e616471034c0
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
+import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.LocalInputSource;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.GranularitySpec;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.config.TaskConfigBuilder;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.joda.time.Interval;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Builder for the {@link Task} objects.
+ *
+ * The builder does not use any defaults and all required fields must be set
+ * explicitly.
+ *
+ * @param Type of task created by this builder.
+ * @param Type of tuning config used by this builder.
+ * @see #ofTypeIndex()
+ * @see #tuningConfig(Consumer) to specify the {@code tuningConfig}.
+ */
+@SuppressWarnings("unchecked")
+public abstract class TaskBuilder, C, T extends Task>
+{
+ // Fields are package-protected to allow access by subclasses like Index, Compact
+ InputSource inputSource = null;
+ InputFormat inputFormat = null;
+
+ final Map context = new HashMap<>();
+
+ Boolean appendToExisting = null;
+
+ final TuningConfigBuilder tuningConfig;
+
+ private TaskBuilder()
+ {
+ this.tuningConfig = tuningConfigBuilder();
+ }
+
+ /**
+ * Creates a raw Map-based payload for a {@code Task} that may be submitted to
+ * the Overlord using {@code OverlordClient.runTask()}.
+ */
+ public abstract T withId(String taskId);
+
+ public abstract B dataSource(String dataSource);
+
+ protected abstract TuningConfigBuilder tuningConfigBuilder();
+
+ /**
+ * Initializes builder for a new {@link IndexTask}.
+ */
+ public static Index ofTypeIndex()
+ {
+ return new Index();
+ }
+
+ /**
+ * Initializes builder for a new {@link ParallelIndexSupervisorTask}.
+ */
+ public static IndexParallel ofTypeIndexParallel()
+ {
+ return new IndexParallel();
+ }
+
+ public static Compact ofTypeCompact()
+ {
+ return new Compact();
+ }
+
+ public B inputSource(InputSource inputSource)
+ {
+ this.inputSource = inputSource;
+ return (B) this;
+ }
+
+ public B inlineInputSourceWithData(String data)
+ {
+ return inputSource(new InlineInputSource(data));
+ }
+
+ public B druidInputSource(String dataSource, Interval interval)
+ {
+ return inputSource(
+ new DruidInputSource(
+ dataSource,
+ interval,
+ null,
+ null,
+ null,
+ null,
+ TestHelper.getTestIndexIO(),
+ new NoopCoordinatorClient(),
+ new SegmentCacheManagerFactory(TestHelper.getTestIndexIO(), TestHelper.JSON_MAPPER),
+ new TaskConfigBuilder().build()
+ )
+ );
+ }
+
+ /**
+ * Gets the absolute path of the given resource files and sets:
+ *
+ * "inputSource": {
+ * "type": "local",
+ * "files": [<absolute-paths-of-given-resource-files>]
+ * }
+ *
+ */
+ public B localInputSourceWithFiles(String... resources)
+ {
+ try {
+ final List files = new ArrayList<>();
+ for (String file : resources) {
+ final URL resourceUrl = getClass().getClassLoader().getResource(file);
+ if (resourceUrl == null) {
+ throw new ISE("Could not find file[%s]", file);
+ }
+
+ files.add(new File(resourceUrl.toURI()));
+ }
+
+ return inputSource(
+ new LocalInputSource(null, null, files, null)
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public B inputFormat(InputFormat inputFormat)
+ {
+ this.inputFormat = inputFormat;
+ return (B) this;
+ }
+
+ public B jsonInputFormat()
+ {
+ return inputFormat(
+ new JsonInputFormat(null, null, null, null, null)
+ );
+ }
+
+ public B csvInputFormatWithColumns(String... columns)
+ {
+ return inputFormat(
+ new CsvInputFormat(List.of(columns), null, null, false, 0, null)
+ );
+ }
+
+ public B appendToExisting(boolean append)
+ {
+ this.appendToExisting = append;
+ return (B) this;
+ }
+
+ public B dynamicPartitionWithMaxRows(int maxRowsPerSegment)
+ {
+ tuningConfig.withPartitionsSpec(new DynamicPartitionsSpec(maxRowsPerSegment, null));
+ return (B) this;
+ }
+
+ public B tuningConfig(Consumer> updateTuningConfig)
+ {
+ updateTuningConfig.accept(tuningConfig);
+ return (B) this;
+ }
+
+ public B context(String key, Object value)
+ {
+ this.context.put(key, value);
+ return (B) this;
+ }
+
+ public abstract static class IndexCommon, C, T extends Task>
+ extends TaskBuilder
+ {
+ final DataSchema.Builder dataSchema = DataSchema.builder();
+
+ @Override
+ public B dataSource(String dataSource)
+ {
+ dataSchema.withDataSource(dataSource);
+ return (B) this;
+ }
+
+ public B dataSchema(Consumer updateDataSchema)
+ {
+ updateDataSchema.accept(dataSchema);
+ return (B) this;
+ }
+
+ public B isoTimestampColumn(String timestampColumn)
+ {
+ dataSchema.withTimestamp(new TimestampSpec(timestampColumn, "iso", null));
+ return (B) this;
+ }
+
+ public B timestampColumn(String timestampColumn)
+ {
+ dataSchema.withTimestamp(new TimestampSpec(timestampColumn, null, null));
+ return (B) this;
+ }
+
+ public B granularitySpec(GranularitySpec granularitySpec)
+ {
+ dataSchema.withGranularity(granularitySpec);
+ return (B) this;
+ }
+
+ public B granularitySpec(String segmentGranularity, String queryGranularity, Boolean rollup)
+ {
+ dataSchema.withGranularity(
+ new UniformGranularitySpec(
+ Granularity.fromString(segmentGranularity),
+ queryGranularity == null ? null : Granularity.fromString(queryGranularity),
+ rollup,
+ null
+ )
+ );
+ return (B) this;
+ }
+
+ /**
+ * Sets {@code "granularitySpec": {"segmentGranularity": }}.
+ */
+ public B segmentGranularity(String granularity)
+ {
+ return granularitySpec(granularity, null, null);
+ }
+
+ /**
+ * Sets the given dimensions as string dimensions in the {@link DataSchema}.
+ *
+ * @see #dataSchema(Consumer) for more options
+ */
+ public B dimensions(String... dimensions)
+ {
+ dataSchema.withDimensions(
+ Stream.of(dimensions)
+ .map(StringDimensionSchema::new)
+ .collect(Collectors.toList())
+ );
+ return (B) this;
+ }
+
+ public B metricAggregates(AggregatorFactory... aggregators)
+ {
+ dataSchema.withAggregators(aggregators);
+ return (B) this;
+ }
+ }
+
+ /**
+ * Builder for {@link IndexTask} that uses a {@link IndexTask.IndexTuningConfig}.
+ */
+ public static class Index extends IndexCommon
+ {
+ @Override
+ public TuningConfigBuilder tuningConfigBuilder()
+ {
+ return TuningConfigBuilder.forIndexTask();
+ }
+
+ @Override
+ public IndexTask withId(String taskId)
+ {
+ Preconditions.checkNotNull(inputSource, "'inputSource' must be specified");
+
+ return new IndexTask(
+ taskId,
+ null,
+ new IndexTask.IndexIngestionSpec(
+ dataSchema.build(),
+ new IndexTask.IndexIOConfig(
+ inputSource,
+ inputFormat,
+ appendToExisting,
+ null
+ ),
+ tuningConfig.build()
+ ),
+ context
+ );
+ }
+ }
+
+ /**
+ * Builder for {@link ParallelIndexSupervisorTask} which uses a {@link ParallelIndexTuningConfig}.
+ */
+ public static class IndexParallel extends IndexCommon
+ {
+ @Override
+ public ParallelIndexSupervisorTask withId(String taskId)
+ {
+ Preconditions.checkNotNull(inputSource, "'inputSource' must be specified");
+ return new ParallelIndexSupervisorTask(
+ taskId,
+ null,
+ null,
+ new ParallelIndexIngestionSpec(
+ dataSchema.build(),
+ new ParallelIndexIOConfig(
+ inputSource,
+ inputFormat,
+ appendToExisting,
+ null
+ ),
+ tuningConfig.build()
+ ),
+ context
+ );
+ }
+
+ @Override
+ public TuningConfigBuilder tuningConfigBuilder()
+ {
+ return TuningConfigBuilder.forParallelIndexTask();
+ }
+ }
+
+ /**
+ * Builder for a {@link CompactionTask} which uses a {@link CompactionTask.CompactionTuningConfig}.
+ */
+ public static class Compact extends TaskBuilder
+ {
+ private String dataSource;
+ private Interval interval;
+ private DimensionsSpec dimensionsSpec;
+ private Granularity segmentGranularity;
+ private ClientCompactionTaskGranularitySpec granularitySpec;
+ private CompactionIOConfig ioConfig;
+
+ @Override
+ public Compact dataSource(String dataSource)
+ {
+ this.dataSource = dataSource;
+ return this;
+ }
+
+ public Compact interval(Interval interval)
+ {
+ this.interval = interval;
+ return this;
+ }
+
+ public Compact dimensions(String... dimensions)
+ {
+ dimensionsSpec = new DimensionsSpec(
+ Stream.of(dimensions)
+ .map(StringDimensionSchema::new)
+ .collect(Collectors.toList())
+ );
+ return this;
+ }
+
+ public Compact granularitySpec(ClientCompactionTaskGranularitySpec granularitySpec)
+ {
+ this.granularitySpec = granularitySpec;
+ return this;
+ }
+
+ public Compact segmentGranularity(Granularity segmentGranularity)
+ {
+ this.segmentGranularity = segmentGranularity;
+ return this;
+ }
+
+ public Compact ioConfig(CompactionInputSpec inputSpec, boolean allowNonAlignedInterval)
+ {
+ this.ioConfig = new CompactionIOConfig(inputSpec, allowNonAlignedInterval, null);
+ return this;
+ }
+
+ @Override
+ public CompactionTask withId(String taskId)
+ {
+ return new CompactionTask(
+ taskId,
+ null,
+ dataSource,
+ interval,
+ null,
+ ioConfig,
+ dimensionsSpec,
+ null,
+ null,
+ null,
+ segmentGranularity,
+ granularitySpec,
+ null,
+ tuningConfig.build(),
+ null,
+ null,
+ null
+ );
+ }
+
+ @Override
+ public TuningConfigBuilder tuningConfigBuilder()
+ {
+ return TuningConfigBuilder.forCompactionTask();
+ }
+ }
+}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITBestEffortRollupParallelIndexTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITBestEffortRollupParallelIndexTest.java
deleted file mode 100644
index 20bdd9767a77..000000000000
--- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITBestEffortRollupParallelIndexTest.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.indexer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.inject.Inject;
-import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.utils.ITRetryUtil;
-import org.apache.druid.testsEx.categories.BatchIndex;
-import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-import java.io.Closeable;
-import java.util.function.Function;
-
-@RunWith(DruidTestRunner.class)
-@Category(BatchIndex.class)
-public class ITBestEffortRollupParallelIndexTest extends AbstractITBatchIndexTest
-{
- // This ingestion spec has a splitHintSpec of maxSplitSize of 1 to test whether or not the task can handle
- // maxSplitSize of 1 properly.
- private static final String INDEX_TASK = "/indexer/wikipedia_parallel_index_task.json";
- private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_parallel_index_queries.json";
- private static final String REINDEX_TASK = "/indexer/wikipedia_parallel_reindex_task.json";
- private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_parallel_reindex_queries.json";
- private static final String INDEX_DATASOURCE = "wikipedia_parallel_index_test";
- private static final String INDEX_INGEST_SEGMENT_DATASOURCE = "wikipedia_parallel_ingest_segment_index_test";
- private static final String INDEX_INGEST_SEGMENT_TASK = "/indexer/wikipedia_parallel_ingest_segment_index_task.json";
- private static final String INDEX_DRUID_INPUT_SOURCE_DATASOURCE = "wikipedia_parallel_druid_input_source_index_test";
- private static final String INDEX_DRUID_INPUT_SOURCE_TASK = "/indexer/wikipedia_parallel_druid_input_source_index_task.json";
-
- private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED =
- CoordinatorDynamicConfig.builder().withPauseCoordination(true).build();
- private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT =
- CoordinatorDynamicConfig.builder().build();
-
- @Inject
- CoordinatorResourceTestClient coordinatorClient;
-
- @Test
- public void testIndexData() throws Exception
- {
- PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, null);
- try (
- final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
- final Closeable ignored2 = unloader(INDEX_INGEST_SEGMENT_DATASOURCE + config.getExtraDatasourceNameSuffix());
- final Closeable ignored3 = unloader(INDEX_DRUID_INPUT_SOURCE_DATASOURCE + config.getExtraDatasourceNameSuffix())
- ) {
- boolean forceGuaranteedRollup = partitionsSpec.isForceGuaranteedRollupCompatible();
- Assert.assertFalse("parititionSpec does not support best-effort rollup", forceGuaranteedRollup);
-
- final Function rollupTransform = spec -> {
- try {
- spec = StringUtils.replace(
- spec,
- "%%FORCE_GUARANTEED_ROLLUP%%",
- Boolean.toString(false)
- );
- spec = StringUtils.replace(
- spec,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("0")
- );
- return StringUtils.replace(
- spec,
- "%%PARTITIONS_SPEC%%",
- jsonMapper.writeValueAsString(partitionsSpec)
- );
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- };
-
- doIndexTest(
- INDEX_DATASOURCE,
- INDEX_TASK,
- rollupTransform,
- INDEX_QUERIES_RESOURCE,
- false,
- true,
- true,
- new Pair<>(false, false)
- );
-
- // Index again, this time only choosing the second data file, and without explicit intervals chosen.
- // The second datafile covers both day segments, so this should replace them, as reflected in the queries.
- doIndexTest(
- INDEX_DATASOURCE,
- REINDEX_TASK,
- rollupTransform,
- REINDEX_QUERIES_RESOURCE,
- true,
- true,
- true,
- new Pair<>(false, false)
- );
-
- doReindexTest(
- INDEX_DATASOURCE,
- INDEX_INGEST_SEGMENT_DATASOURCE,
- rollupTransform,
- INDEX_INGEST_SEGMENT_TASK,
- REINDEX_QUERIES_RESOURCE,
- new Pair<>(false, false)
- );
-
- // with DruidInputSource
- doReindexTest(
- INDEX_DATASOURCE,
- INDEX_DRUID_INPUT_SOURCE_DATASOURCE,
- rollupTransform,
- INDEX_DRUID_INPUT_SOURCE_TASK,
- REINDEX_QUERIES_RESOURCE,
- new Pair<>(false, false)
- );
- }
- }
-
- /**
- * Test a non zero value for awaitSegmentAvailabilityTimeoutMillis. This will confirm that the report for the task
- * indicates segments were confirmed to be available on the cluster before finishing the ingestion job.
- */
- @Test
- public void testIndexDataVerifySegmentAvailability() throws Exception
- {
- PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, null);
- try (
- final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
- ) {
- boolean forceGuaranteedRollup = partitionsSpec.isForceGuaranteedRollupCompatible();
- Assert.assertFalse("parititionSpec does not support best-effort rollup", forceGuaranteedRollup);
-
- final Function rollupTransform = spec -> {
- try {
- spec = StringUtils.replace(
- spec,
- "%%FORCE_GUARANTEED_ROLLUP%%",
- Boolean.toString(false)
- );
- spec = StringUtils.replace(
- spec,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("600000")
- );
- return StringUtils.replace(
- spec,
- "%%PARTITIONS_SPEC%%",
- jsonMapper.writeValueAsString(partitionsSpec)
- );
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- };
-
- doIndexTest(
- INDEX_DATASOURCE,
- INDEX_TASK,
- rollupTransform,
- INDEX_QUERIES_RESOURCE,
- false,
- true,
- true,
- new Pair<>(true, true)
- );
- }
- }
-
- /**
- * Test a non zero value for awaitSegmentAvailabilityTimeoutMillis. Setting the config value to 1 millis
- * and pausing coordination to confirm that the task will still succeed even if the job was not able to confirm the
- * segments were loaded by the time the timeout occurs.
- */
- @Test
- public void testIndexDataAwaitSegmentAvailabilityFailsButTaskSucceeds() throws Exception
- {
- PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, null);
- try (
- final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
- ) {
- coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_PAUSED);
- boolean forceGuaranteedRollup = partitionsSpec.isForceGuaranteedRollupCompatible();
- Assert.assertFalse("parititionSpec does not support best-effort rollup", forceGuaranteedRollup);
-
- final Function rollupTransform = spec -> {
- try {
- spec = StringUtils.replace(
- spec,
- "%%FORCE_GUARANTEED_ROLLUP%%",
- Boolean.toString(false)
- );
- spec = StringUtils.replace(
- spec,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("1")
- );
- return StringUtils.replace(
- spec,
- "%%PARTITIONS_SPEC%%",
- jsonMapper.writeValueAsString(partitionsSpec)
- );
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- };
-
- doIndexTest(
- INDEX_DATASOURCE,
- INDEX_TASK,
- rollupTransform,
- INDEX_QUERIES_RESOURCE,
- false,
- false,
- false,
- new Pair<>(true, false)
- );
- coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_DEFAULT);
- ITRetryUtil.retryUntilTrue(
- () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()), "Segment Load"
- );
- }
- }
-}
diff --git a/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_index_queries.json b/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_index_queries.json
deleted file mode 100644
index 193e69fbc9ca..000000000000
--- a/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_index_queries.json
+++ /dev/null
@@ -1,80 +0,0 @@
-[
- {
- "description": "timeseries, 1 agg, all",
- "query":{
- "queryType" : "timeBoundary",
- "dataSource": "%%DATASOURCE%%"
- },
- "expectedResults":[
- {
- "timestamp" : "2015-09-12T00:00:00.000Z",
- "result" : {
- "minTime" : "2015-09-12T00:00:00.000Z",
- "maxTime" : "2015-09-12T00:00:00.000Z"
- }
- }
- ]
- },
- {
- "description": "scan, all",
- "query": {
- "queryType": "scan",
- "dataSource": "%%DATASOURCE%%",
- "intervals": [
- "2013-01-01/2020-01-02"
- ],
- "resultFormat":"compactedList"
- },
- "expectedResults": %%EXPECTED_SCAN_RESULT%%,
- "fieldsToTest": ["events"]
- },
- {
- "description": "roll up ratio",
- "query": {
- "queryType":"timeseries",
- "dataSource":{
- "type":"table",
- "name":"%%DATASOURCE%%"
- },
- "intervals":{
- "type":"intervals",
- "intervals":[
- "2013-01-01/2020-01-02"
- ]
- },
- "granularity":{
- "type":"all"
- },
- "aggregations":[
- {
- "type":"count",
- "name":"a0"
- },
- {
- "type":"longSum",
- "name":"a1",
- "fieldName":"count",
- "expression":null
- }
- ],
- "postAggregations":[
- {
- "type":"expression",
- "name":"p0",
- "expression":"((\"a0\" * 1.00) / \"a1\")",
- "ordering":null
- }
- ]
- },
- "expectedResults": [
- {
- "timestamp" : "2015-09-12T00:00:00.000Z",
- "result" : {
- "a1" : %%EXPECTED_SUM_COUNT%%,
- "p0" : %%EXPECTED_ROLLUP_RATIO%%,
- "a0" : %%EXPECTED_NUM_ROW%%
- }
- }
- ]
- }
-]
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_index_task.json b/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_index_task.json
deleted file mode 100644
index 3a21a856ac6f..000000000000
--- a/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_index_task.json
+++ /dev/null
@@ -1,57 +0,0 @@
-{
- "type": "index_parallel",
- "spec": {
- "ioConfig": {
- "type": "index_parallel",
- "inputSource": {
- "type": "inline",
- "data": "{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"F\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"J\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"Z\",\"dimB\":\"S\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"Z\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"J\",\"dimB\":\"R\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"T\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimC\":\"A\",\"dimB\":\"X\",\"metA\":1}\n"
- },
- "inputFormat": {
- "type": "json"
- }
- },
- "tuningConfig": {
- "type": "index_parallel",
- "partitionsSpec": {
- "type": "dynamic",
- "maxRowsPerSegment": 3,
- "maxTotalRows": 3
- },
- "maxRowsInMemory": 3
- },
- "dataSchema": {
- "dataSource": "%%DATASOURCE%%",
- "timestampSpec": {
- "column": "time",
- "format": "iso"
- },
- "dimensionsSpec": {
- "dimensions": [
- "dimB",
- "dimA",
- "dimC",
- "dimD",
- "dimE",
- "dimF"
- ]
- },
- "granularitySpec": {
- "queryGranularity": "hour",
- "rollup": true,
- "segmentGranularity": "hour"
- },
- "metricsSpec": [
- {
- "name": "count",
- "type": "count"
- },
- {
- "name": "sum_metA",
- "type": "longSum",
- "fieldName": "metA"
- }
- ]
- }
- }
-}
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_with_dim_compaction_task.json b/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_with_dim_compaction_task.json
deleted file mode 100644
index 9416a3f6bdad..000000000000
--- a/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_with_dim_compaction_task.json
+++ /dev/null
@@ -1,19 +0,0 @@
-{
- "type": "compact",
- "dataSource": "%%DATASOURCE%%",
- "dimensionsSpec": {
- "dimensions": %%DIMENSION_NAMES%%
- },
- "interval": "2010-10-29T05:00:00Z/2030-10-29T06:00:00Z",
- "tuningConfig": {
- "type": "index_parallel",
- "maxRowsPerSegment": 3,
- "maxRowsInMemory": 3,
- "maxNumConcurrentSubTasks": 2,
- "partitionsSpec": {
- "type": "hashed",
- "numShards": 1
- },
- "forceGuaranteedRollup": true
- }
-}
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_without_dim_compaction_task.json b/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_without_dim_compaction_task.json
deleted file mode 100644
index a149d7a25127..000000000000
--- a/integration-tests-ex/cases/src/test/resources/indexer/sparse_column_without_dim_compaction_task.json
+++ /dev/null
@@ -1,16 +0,0 @@
-{
- "type": "compact",
- "dataSource": "%%DATASOURCE%%",
- "interval": "2010-10-29T05:00:00Z/2030-10-29T06:00:00Z",
- "tuningConfig": {
- "type": "index_parallel",
- "maxRowsPerSegment": 3,
- "maxRowsInMemory": 3,
- "maxNumConcurrentSubTasks": 2,
- "partitionsSpec": {
- "type": "hashed",
- "numShards": 1
- },
- "forceGuaranteedRollup": true
- }
-}
\ No newline at end of file
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index 9ba40a8d1d89..601754a27007 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -41,14 +41,8 @@ public class TestNGGroup
public static final String KAFKA_DATA_FORMAT = "kafka-data-format";
- public static final String COMPACTION = "compaction";
-
- public static final String UPGRADE = "upgrade";
-
public static final String APPEND_INGESTION = "append-ingestion";
- public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index";
-
/**
* This group can only be run individually using -Dgroups=query since it requires specific test data setup.
*/
@@ -152,12 +146,8 @@ public class TestNGGroup
public static final String HIGH_AVAILABILTY = "high-availability";
- public static final String SHUFFLE_DEEP_STORE = "shuffle-deep-store";
-
public static final String CUSTOM_COORDINATOR_DUTIES = "custom-coordinator-duties";
- public static final String HTTP_ENDPOINT = "http-endpoint";
-
public static final String CENTRALIZED_DATASOURCE_SCHEMA = "centralized-datasource-schema";
public static final String CDS_TASK_SCHEMA_PUBLISH_DISABLED = "cds-task-schema-publish-disabled";
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceNotFoundTest.java b/integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceNotFoundTest.java
deleted file mode 100644
index 8959c7744fe0..000000000000
--- a/integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceNotFoundTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.api;
-
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.testing.clients.OverlordResourceTestClient;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.Assert;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.util.function.Consumer;
-
-@Test(groups = TestNGGroup.HTTP_ENDPOINT)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITOverlordResourceNotFoundTest
-{
- @Inject
- protected OverlordResourceTestClient indexer;
-
- @Test
- public void testGetSupervisorStatusNotFound()
- {
- callAndCheckNotFound(indexer::getSupervisorStatus);
- }
-
- @Test
- public void testGetSupervisorHistoryNotFound()
- {
- callAndCheckNotFound(indexer::getSupervisorHistory);
- }
-
- @Test
- public void testResumeSupervisorNotFound()
- {
- callAndCheckNotFound(indexer::resumeSupervisor);
- }
-
- @Test
- public void testSuspendSupervisorNotFound()
- {
- callAndCheckNotFound(indexer::suspendSupervisor);
- }
-
- @Test
- public void testShutdownSupervisorNotFound()
- {
- callAndCheckNotFound(indexer::shutdownSupervisor);
- }
-
- @Test
- public void testTerminateSupervisorNotFound()
- {
- callAndCheckNotFound(indexer::terminateSupervisor);
- }
-
- @Test
- public void testGetSupervisorHealthNotFound()
- {
- callAndCheckNotFound(indexer::getSupervisorHealth);
- }
-
- @Test
- public void testStatsSupervisorNotFound()
- {
- callAndCheckNotFound(indexer::statsSupervisor);
- }
-
- @Test
- public void testResetSupervisorNotFound()
- {
- callAndCheckNotFound(indexer::resetSupervisor);
- }
-
- @Test
- public void testGetTaskStatusNotFound()
- {
- callAndCheckNotFound(indexer::getTaskStatus);
- }
-
- @Test
- public void testShutdownTaskNotFound()
- {
- callAndCheckNotFound(indexer::shutdownTask);
- }
-
- @Test
- public void testGetTaskLogNotFound()
- {
- callAndCheckNotFound(indexer::getTaskLog);
- }
-
- @Test
- public void testGetTaskReportNotFound()
- {
- callAndCheckNotFound(indexer::getTaskReport);
- }
-
- @Test
- public void testGetTaskPayLoadNotFound()
- {
- callAndCheckNotFound(indexer::getTaskPayload);
- }
-
- private void callAndCheckNotFound(Consumer runnable)
- {
- String supervisorId = "not_exist_id";
- try {
- runnable.accept(supervisorId);
- }
- catch (ISE e) {
- // OverlordResourceTestClient turns all non-200 response into ISE exception
- // So we catch ISE and check if the message in this exception matches expected message
- Assert.assertTrue(
- e.getMessage().contains("[404 Not Found") && e.getMessage().contains(supervisorId),
- "Unexpected exception. Message does not match expected. " + e.getMessage()
- );
- return;
- }
- Assert.fail("Should not go to here");
- }
-}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceTest.java
deleted file mode 100644
index eb59afee0f76..000000000000
--- a/integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.api;
-
-import com.google.inject.Inject;
-import org.apache.druid.indexer.TaskState;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.testing.clients.OverlordResourceTestClient;
-import org.apache.druid.testing.clients.TaskResponseObject;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.utils.ITRetryUtil;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractIndexerTest;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.List;
-
-@Test(groups = TestNGGroup.HTTP_ENDPOINT)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITOverlordResourceTest
-{
- private static final String INGESTION_SPEC = "/api/overlord-resource-test-task.json";
-
- @Inject
- protected OverlordResourceTestClient indexer;
-
- @Test
- public void testGetAllTasks() throws IOException
- {
- final String taskSpec = AbstractIndexerTest.getResourceAsString(INGESTION_SPEC);
- final String taskId = indexer.submitTask(taskSpec);
-
- ITRetryUtil.retryUntil(
- () -> {
- final List tasks = indexer.getAllTasks();
- final TaskResponseObject taskStatus = tasks
- .stream()
- .filter(task -> taskId.equals(task.getId()))
- .findAny()
- .orElseThrow(() -> new ISE("Cannot find task[%s]", taskId));
- TaskState status = taskStatus.getStatus();
- if (status == TaskState.FAILED) {
- throw new ISE("Task[%s] FAILED", taskId);
- }
- return status == TaskState.SUCCESS;
- },
- true,
- ITRetryUtil.DEFAULT_RETRY_SLEEP,
- ITRetryUtil.DEFAULT_RETRY_COUNT,
- taskId
- );
- }
-}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
deleted file mode 100644
index 11e4ec6cad30..000000000000
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.coordinator.duty;
-
-import com.google.inject.Inject;
-import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.LockFilterPolicy;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.server.coordinator.ClusterCompactionConfig;
-import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
-import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
-import org.apache.druid.testing.clients.CompactionResourceTestClient;
-import org.apache.druid.testing.clients.TaskResponseObject;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.utils.EventSerializer;
-import org.apache.druid.testing.utils.ITRetryUtil;
-import org.apache.druid.testing.utils.KafkaUtil;
-import org.apache.druid.testing.utils.StreamEventWriter;
-import org.apache.druid.testing.utils.StreamGenerator;
-import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
-import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
-import org.joda.time.Period;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * Integration Test to verify behaviour when there is a lock contention between
- * compaction tasks and on-going stream ingestion tasks.
- */
-@Test(groups = {TestNGGroup.COMPACTION})
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITAutoCompactionLockContentionTest extends AbstractKafkaIndexingServiceTest
-{
- private static final Logger LOG = new Logger(ITAutoCompactionLockContentionTest.class);
-
- @Inject
- private CompactionResourceTestClient compactionResource;
-
- private GeneratedTestConfig generatedTestConfig;
- private StreamGenerator streamGenerator;
-
- private String fullDatasourceName;
-
- @DataProvider
- public static Object[] getParameters()
- {
- return new Object[]{false, true};
- }
-
- @BeforeClass
- public void setupClass() throws Exception
- {
- doBeforeClass();
- }
-
- @BeforeMethod
- public void setup() throws Exception
- {
- generatedTestConfig = new GeneratedTestConfig(
- Specs.PARSER_TYPE,
- getResourceAsString(Specs.INPUT_FORMAT_PATH)
- );
- fullDatasourceName = generatedTestConfig.getFullDatasourceName();
- final EventSerializer serializer = jsonMapper.readValue(
- getResourceAsStream(Specs.SERIALIZER_PATH),
- EventSerializer.class
- );
- streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 6, 100);
- }
-
- @Override
- public String getTestNamePrefix()
- {
- return "autocompact_lock_contention";
- }
-
- @Test(dataProvider = "getParameters")
- public void testAutoCompactionSkipsLockedIntervals(boolean transactionEnabled) throws Exception
- {
- if (shouldSkipTest(transactionEnabled)) {
- return;
- }
-
- try (
- final Closeable closer = createResourceCloser(generatedTestConfig);
- final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
- ) {
- // Start supervisor
- final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
- .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
- generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
- LOG.info("supervisorSpec: [%s]", taskSpec);
-
- // Generate data for minutes 1, 2 and 3
- final Interval minute1 = Intervals.of("2000-01-01T01:01:00Z/2000-01-01T01:02:00Z");
- final long rowsForMinute1 = generateData(minute1, streamEventWriter);
-
- final Interval minute2 = Intervals.of("2000-01-01T01:02:00Z/2000-01-01T01:03:00Z");
- long rowsForMinute2 = generateData(minute2, streamEventWriter);
-
- final Interval minute3 = Intervals.of("2000-01-01T01:03:00Z/2000-01-01T01:04:00Z");
- final long rowsForMinute3 = generateData(minute3, streamEventWriter);
-
- // Wait for data to be ingested for all the minutes
- ensureRowCount(rowsForMinute1 + rowsForMinute2 + rowsForMinute3);
-
- // Wait for the segments to be loaded and interval locks to be released
- ensureLockedIntervals();
- ensureSegmentsLoaded();
-
- // 2 segments for each minute, total 6
- ensureSegmentsCount(6);
-
- // Generate more data for minute2 so that it gets locked
- rowsForMinute2 += generateData(minute2, streamEventWriter);
- ensureLockedIntervals(minute2);
-
- // Trigger auto compaction
- submitAndVerifyCompactionConfig();
- compactionResource.forceTriggerAutoCompaction();
-
- // Wait for segments to be loaded
- ensureRowCount(rowsForMinute1 + rowsForMinute2 + rowsForMinute3);
- ensureLockedIntervals();
- ensureSegmentsLoaded();
-
- // Verify that minute1 and minute3 have been compacted
- ensureCompactionTaskCount(2);
- verifyCompactedIntervals(minute1, minute3);
-
- // Trigger auto compaction again
- compactionResource.forceTriggerAutoCompaction();
-
- // Verify that all the segments are now compacted
- ensureCompactionTaskCount(3);
- ensureSegmentsLoaded();
- verifyCompactedIntervals(minute1, minute2, minute3);
- ensureSegmentsCount(3);
- }
- }
-
- /**
- * Retries until the segment count is as expected.
- */
- private void ensureSegmentsCount(int numExpectedSegments)
- {
- ITRetryUtil.retryUntilEquals(
- () -> coordinator.getFullSegmentsMetadata(fullDatasourceName).size(),
- numExpectedSegments,
- "Segment count"
- );
- }
-
- /**
- * Verifies that the given intervals have been compacted.
- */
- private void verifyCompactedIntervals(Interval... compactedIntervals)
- {
- List segments = coordinator.getFullSegmentsMetadata(fullDatasourceName);
- List observedCompactedSegments = new ArrayList<>();
- Set observedCompactedIntervals = new HashSet<>();
- for (DataSegment segment : segments) {
- if (segment.getLastCompactionState() != null) {
- observedCompactedSegments.add(segment);
- observedCompactedIntervals.add(segment.getInterval());
- }
- }
-
- Set expectedCompactedIntervals = new HashSet<>(Arrays.asList(compactedIntervals));
- Assert.assertEquals(observedCompactedIntervals, expectedCompactedIntervals);
-
- DynamicPartitionsSpec expectedPartitionSpec = new DynamicPartitionsSpec(
- Specs.MAX_ROWS_PER_SEGMENT,
- Long.MAX_VALUE
- );
- for (DataSegment compactedSegment : observedCompactedSegments) {
- Assert.assertNotNull(compactedSegment.getLastCompactionState());
- Assert.assertEquals(
- compactedSegment.getLastCompactionState().getPartitionsSpec(),
- expectedPartitionSpec
- );
- }
- }
-
- /**
- * Generates data points for the specified interval.
- *
- * @return Number of rows generated.
- */
- private long generateData(Interval interval, StreamEventWriter streamEventWriter)
- {
- long rowCount = streamGenerator.run(
- generatedTestConfig.getStreamName(),
- streamEventWriter,
- 10,
- interval.getStart()
- );
- LOG.info("Generated %d Rows for Interval [%s]", rowCount, interval);
-
- return rowCount;
- }
-
- /**
- * Retries until segments have been loaded.
- */
- private void ensureSegmentsLoaded()
- {
- ITRetryUtil.retryUntilTrue(
- () -> coordinator.areSegmentsLoaded(fullDatasourceName),
- "Segments are loaded"
- );
- }
-
- /**
- * Retries until the specified Intervals are locked for the current datasource.
- * If no interval has been specified, retries until no interval is locked
- */
- private void ensureLockedIntervals(Interval... intervals)
- {
- final LockFilterPolicy lockFilterPolicy = new LockFilterPolicy(fullDatasourceName, 0, null, null);
- final Set expectedLockedIntervals = Arrays.stream(intervals).collect(Collectors.toSet());
- ITRetryUtil.retryUntilEquals(
- () -> Set.copyOf(
- indexer.getLockedIntervals(List.of(lockFilterPolicy))
- .getOrDefault(fullDatasourceName, List.of())
- ),
- expectedLockedIntervals,
- "Locked intervals"
- );
- }
-
- /**
- * Checks if a test should be skipped based on whether transaction is enabled or not.
- */
- private boolean shouldSkipTest(boolean testEnableTransaction)
- {
- Map kafkaTestProps = KafkaUtil
- .getAdditionalKafkaTestConfigFromProperties(config);
- boolean configEnableTransaction = Boolean.parseBoolean(
- kafkaTestProps.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED, "false")
- );
-
- return configEnableTransaction != testEnableTransaction;
- }
-
- /**
- * Submits a compaction config for the current datasource.
- */
- private void submitAndVerifyCompactionConfig() throws Exception
- {
- final DataSourceCompactionConfig dataSourceCompactionConfig = InlineSchemaDataSourceCompactionConfig
- .builder()
- .forDataSource(fullDatasourceName)
- .withSkipOffsetFromLatest(Period.ZERO)
- .withMaxRowsPerSegment(Specs.MAX_ROWS_PER_SEGMENT)
- .build();
- compactionResource.updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, null, null));
- compactionResource.submitCompactionConfig(dataSourceCompactionConfig);
-
- // Verify that the compaction config is updated correctly.
- DataSourceCompactionConfig observedCompactionConfig
- = compactionResource.getDataSourceCompactionConfig(fullDatasourceName);
- Assert.assertEquals(observedCompactionConfig, dataSourceCompactionConfig);
- }
-
- /**
- * Checks if the given TaskResponseObject represents a Compaction Task.
- */
- private boolean isCompactionTask(TaskResponseObject taskResponse)
- {
- return "compact".equalsIgnoreCase(taskResponse.getType());
- }
-
- /**
- * Retries until the total number of complete compaction tasks is as expected.
- */
- private void ensureCompactionTaskCount(int expectedCount)
- {
- ITRetryUtil.retryUntilEquals(
- this::getNumberOfCompletedCompactionTasks,
- expectedCount,
- "Number of completed compaction tasks"
- );
- }
-
- /**
- * Gets the number of complete compaction tasks.
- */
- private int getNumberOfCompletedCompactionTasks()
- {
- List completeTasks = indexer
- .getCompleteTasksForDataSource(fullDatasourceName);
-
- return (int) completeTasks.stream().filter(this::isCompactionTask).count();
- }
-
- /**
- * Retries until the total row count is as expected.
- */
- private void ensureRowCount(long totalRows)
- {
- ITRetryUtil.retryUntilEquals(
- () -> queryHelper.countRows(
- fullDatasourceName,
- Intervals.ETERNITY,
- name -> new LongSumAggregatorFactory(name, "count")
- ),
- totalRows,
- "Total row count in datasource"
- );
- }
-
- /**
- * Constants for test specs.
- */
- private static class Specs
- {
- static final String SERIALIZER_PATH = DATA_RESOURCE_ROOT + "/csv/serializer/serializer.json";
- static final String INPUT_FORMAT_PATH = DATA_RESOURCE_ROOT + "/csv/input_format/input_format.json";
- static final String PARSER_TYPE = AbstractStreamIndexingTest.INPUT_FORMAT;
-
- static final int MAX_ROWS_PER_SEGMENT = 10000;
- }
-
-}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index c35490336840..a4f8b10bad54 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -211,7 +211,7 @@ public static String getResourceAsString(String file) throws IOException
public static InputStream getResourceAsStream(String resource)
{
- return ITCompactionTaskTest.class.getResourceAsStream(resource);
+ return AbstractIndexerTest.class.getResourceAsStream(resource);
}
public static List listResources(String dir) throws IOException
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java
deleted file mode 100644
index 53deb831a2db..000000000000
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.inject.Inject;
-import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.utils.ITRetryUtil;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.Assert;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.Closeable;
-import java.util.function.Function;
-
-@Test(groups = {TestNGGroup.BATCH_INDEX})
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITBestEffortRollupParallelIndexTest extends AbstractITBatchIndexTest
-{
- // This ingestion spec has a splitHintSpec of maxSplitSize of 1 to test whether or not the task can handle
- // maxSplitSize of 1 properly.
- private static final String INDEX_TASK = "/indexer/wikipedia_parallel_index_task.json";
- private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_parallel_index_queries.json";
- private static final String REINDEX_TASK = "/indexer/wikipedia_parallel_reindex_task.json";
- private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_parallel_reindex_queries.json";
- private static final String INDEX_DATASOURCE = "wikipedia_parallel_index_test";
- private static final String INDEX_INGEST_SEGMENT_DATASOURCE = "wikipedia_parallel_ingest_segment_index_test";
- private static final String INDEX_INGEST_SEGMENT_TASK = "/indexer/wikipedia_parallel_ingest_segment_index_task.json";
- private static final String INDEX_DRUID_INPUT_SOURCE_DATASOURCE = "wikipedia_parallel_druid_input_source_index_test";
- private static final String INDEX_DRUID_INPUT_SOURCE_TASK = "/indexer/wikipedia_parallel_druid_input_source_index_task.json";
-
- private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED =
- CoordinatorDynamicConfig.builder().withPauseCoordination(true).build();
- private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT =
- CoordinatorDynamicConfig.builder().build();
-
- @Inject
- CoordinatorResourceTestClient coordinatorClient;
-
- @DataProvider
- public static Object[][] resources()
- {
- return new Object[][]{
- {new DynamicPartitionsSpec(null, null)}
- };
- }
-
- @Test(dataProvider = "resources")
- public void testIndexData(PartitionsSpec partitionsSpec) throws Exception
- {
- try (
- final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
- final Closeable ignored2 = unloader(INDEX_INGEST_SEGMENT_DATASOURCE + config.getExtraDatasourceNameSuffix());
- final Closeable ignored3 = unloader(INDEX_DRUID_INPUT_SOURCE_DATASOURCE + config.getExtraDatasourceNameSuffix())
- ) {
- boolean forceGuaranteedRollup = partitionsSpec.isForceGuaranteedRollupCompatible();
- Assert.assertFalse(forceGuaranteedRollup, "parititionSpec does not support best-effort rollup");
-
- final Function rollupTransform = spec -> {
- try {
- spec = StringUtils.replace(
- spec,
- "%%FORCE_GUARANTEED_ROLLUP%%",
- Boolean.toString(false)
- );
- spec = StringUtils.replace(
- spec,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("0")
- );
- return StringUtils.replace(
- spec,
- "%%PARTITIONS_SPEC%%",
- jsonMapper.writeValueAsString(partitionsSpec)
- );
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- };
-
- doIndexTest(
- INDEX_DATASOURCE,
- INDEX_TASK,
- rollupTransform,
- INDEX_QUERIES_RESOURCE,
- false,
- true,
- true,
- new Pair<>(false, false)
- );
-
- // Index again, this time only choosing the second data file, and without explicit intervals chosen.
- // The second datafile covers both day segments, so this should replace them, as reflected in the queries.
- doIndexTest(
- INDEX_DATASOURCE,
- REINDEX_TASK,
- rollupTransform,
- REINDEX_QUERIES_RESOURCE,
- true,
- true,
- true,
- new Pair<>(false, false)
- );
-
- doReindexTest(
- INDEX_DATASOURCE,
- INDEX_INGEST_SEGMENT_DATASOURCE,
- rollupTransform,
- INDEX_INGEST_SEGMENT_TASK,
- REINDEX_QUERIES_RESOURCE,
- new Pair<>(false, false)
- );
-
- // with DruidInputSource
- doReindexTest(
- INDEX_DATASOURCE,
- INDEX_DRUID_INPUT_SOURCE_DATASOURCE,
- rollupTransform,
- INDEX_DRUID_INPUT_SOURCE_TASK,
- REINDEX_QUERIES_RESOURCE,
- new Pair<>(false, false)
- );
- }
- }
-
- /**
- * Test a non zero value for awaitSegmentAvailabilityTimeoutMillis. This will confirm that the report for the task
- * indicates segments were confirmed to be available on the cluster before finishing the ingestion job.
- *
- * @param partitionsSpec
- * @throws Exception
- */
- @Test(dataProvider = "resources")
- public void testIndexDataVerifySegmentAvailability(PartitionsSpec partitionsSpec) throws Exception
- {
- try (
- final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
- ) {
- boolean forceGuaranteedRollup = partitionsSpec.isForceGuaranteedRollupCompatible();
- Assert.assertFalse(forceGuaranteedRollup, "parititionSpec does not support best-effort rollup");
-
- final Function rollupTransform = spec -> {
- try {
- spec = StringUtils.replace(
- spec,
- "%%FORCE_GUARANTEED_ROLLUP%%",
- Boolean.toString(false)
- );
- spec = StringUtils.replace(
- spec,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("600000")
- );
- return StringUtils.replace(
- spec,
- "%%PARTITIONS_SPEC%%",
- jsonMapper.writeValueAsString(partitionsSpec)
- );
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- };
-
- doIndexTest(
- INDEX_DATASOURCE,
- INDEX_TASK,
- rollupTransform,
- INDEX_QUERIES_RESOURCE,
- false,
- true,
- true,
- new Pair<>(true, true)
- );
- }
- }
-
- /**
- * Test a non zero value for awaitSegmentAvailabilityTimeoutMillis. Setting the config value to 1 millis
- * and pausing coordination to confirm that the task will still succeed even if the job was not able to confirm the
- * segments were loaded by the time the timeout occurs.
- *
- * @param partitionsSpec
- * @throws Exception
- */
- @Test(dataProvider = "resources")
- public void testIndexDataAwaitSegmentAvailabilityFailsButTaskSucceeds(PartitionsSpec partitionsSpec) throws Exception
- {
- try (
- final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
- ) {
- coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_PAUSED);
- boolean forceGuaranteedRollup = partitionsSpec.isForceGuaranteedRollupCompatible();
- Assert.assertFalse(forceGuaranteedRollup, "parititionSpec does not support best-effort rollup");
-
- final Function rollupTransform = spec -> {
- try {
- spec = StringUtils.replace(
- spec,
- "%%FORCE_GUARANTEED_ROLLUP%%",
- Boolean.toString(false)
- );
- spec = StringUtils.replace(
- spec,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("1")
- );
- return StringUtils.replace(
- spec,
- "%%PARTITIONS_SPEC%%",
- jsonMapper.writeValueAsString(partitionsSpec)
- );
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- };
-
- doIndexTest(
- INDEX_DATASOURCE,
- INDEX_TASK,
- rollupTransform,
- INDEX_QUERIES_RESOURCE,
- false,
- false,
- false,
- new Pair<>(true, false)
- );
- coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_DEFAULT);
- ITRetryUtil.retryUntilTrue(
- () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()), "Segment Load"
- );
- }
- }
-}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
deleted file mode 100644
index b974c7d20e97..000000000000
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import com.google.inject.Inject;
-import org.apache.commons.io.IOUtils;
-import org.apache.druid.indexer.report.IngestionStatsAndErrors;
-import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexer.report.TaskReport;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.GranularityType;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.testing.IntegrationTestingConfig;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.utils.ITRetryUtil;
-import org.apache.druid.tests.TestNGGroup;
-import org.joda.time.Interval;
-import org.joda.time.chrono.ISOChronology;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Method;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITCompactionTaskTest extends AbstractIndexerTest
-{
- private static final Logger LOG = new Logger(ITCompactionTaskTest.class);
- private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
- private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
-
- private static final String INDEX_QUERIES_YEAR_RESOURCE = "/indexer/wikipedia_index_queries_year_query_granularity.json";
- private static final String INDEX_QUERIES_HOUR_RESOURCE = "/indexer/wikipedia_index_queries_hour_query_granularity.json";
-
- private static final String INDEX_DATASOURCE = "wikipedia_index_test";
-
- private static final String SEGMENT_METADATA_QUERY_RESOURCE = "/indexer/segment_metadata_query.json";
-
- private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
- private static final String PARALLEL_COMPACTION_TASK = "/indexer/wikipedia_compaction_task_parallel.json";
- private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY = "/indexer/wikipedia_compaction_task_with_segment_granularity.json";
- private static final String COMPACTION_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_compaction_task_with_granularity_spec.json";
-
- private static final String INDEX_TASK_WITH_TIMESTAMP = "/indexer/wikipedia_with_timestamp_index_task.json";
-
- @Inject
- private IntegrationTestingConfig config;
-
- private String fullDatasourceName;
-
- @BeforeMethod
- public void setFullDatasourceName(Method method)
- {
- fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix() + "-" + method.getName();
- }
-
- @Test
- public void testCompaction() throws Exception
- {
- loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK, null);
- }
-
- @Test
- public void testCompactionWithSegmentGranularity() throws Exception
- {
- loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK_WITH_SEGMENT_GRANULARITY, GranularityType.MONTH);
- }
-
- @Test
- public void testCompactionWithSegmentGranularityInGranularitySpec() throws Exception
- {
- loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK_WITH_GRANULARITY_SPEC, GranularityType.MONTH);
- }
-
- @Test
- public void testCompactionWithQueryGranularityInGranularitySpec() throws Exception
- {
- try (final Closeable ignored = unloader(fullDatasourceName)) {
- loadData(INDEX_TASK, fullDatasourceName);
- // 4 segments across 2 days
- checkNumberOfSegments(4);
- List expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
- expectedIntervalAfterCompaction.sort(null);
-
- checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
- String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
- queryHelper.testQueriesFromString(queryResponseTemplate);
- // QueryGranularity was SECOND, now we will change it to HOUR (QueryGranularity changed to coarser)
- compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, null, GranularityType.HOUR);
-
- // The original 4 segments should be compacted into 2 new segments since data only has 2 days and the compaction
- // segmentGranularity is DAY
- checkNumberOfSegments(2);
- queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_HOUR_RESOURCE);
- queryHelper.testQueriesFromString(queryResponseTemplate);
- checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.HOUR.name(), 2);
- checkCompactionIntervals(expectedIntervalAfterCompaction);
-
- // QueryGranularity was HOUR, now we will change it to MINUTE (QueryGranularity changed to finer)
- compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, null, GranularityType.MINUTE);
-
- // There will be no change in number of segments as compaction segmentGranularity is the same and data interval
- // is the same. Since QueryGranularity is changed to finer qranularity, the data will remains the same. (data
- // will just be bucketed to a finer qranularity but roll up will not be different
- // i.e. 2020-10-29T05:00 will just be bucketed to 2020-10-29T05:00:00)
- checkNumberOfSegments(2);
- queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_HOUR_RESOURCE);
- queryHelper.testQueriesFromString(queryResponseTemplate);
- checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.MINUTE.name(), 2);
- checkCompactionIntervals(expectedIntervalAfterCompaction);
- }
- }
-
- @Test
- public void testParallelHashedCompaction() throws Exception
- {
- try (final Closeable ignored = unloader(fullDatasourceName)) {
- loadData(INDEX_TASK, fullDatasourceName);
- // 4 segments across 2 days
- checkNumberOfSegments(4);
- List expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
- expectedIntervalAfterCompaction.sort(null);
-
- checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
- String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
-
- queryResponseTemplate = StringUtils.replace(
- queryResponseTemplate,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("0")
- );
-
- queryHelper.testQueriesFromString(queryResponseTemplate);
- String taskId = compactData(PARALLEL_COMPACTION_TASK, null, null);
-
- // The original 4 segments should be compacted into 2 new segments
- checkNumberOfSegments(2);
- queryHelper.testQueriesFromString(queryResponseTemplate);
- checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 2);
-
-
- checkCompactionIntervals(expectedIntervalAfterCompaction);
-
- Map reports = indexer.getTaskReport(taskId);
- Assert.assertTrue(reports != null && reports.size() > 0);
-
- Assert.assertEquals(
- 2,
- reports.values()
- .stream()
- .filter(r -> r instanceof IngestionStatsAndErrorsTaskReport)
- .mapToLong(r -> ((IngestionStatsAndErrors) r.getPayload()).getSegmentsPublished())
- .sum()
- );
- Assert.assertEquals(
- 4,
- reports.values()
- .stream()
- .filter(r -> r instanceof IngestionStatsAndErrorsTaskReport)
- .mapToLong(r -> ((IngestionStatsAndErrors) r.getPayload()).getSegmentsRead())
- .sum()
- );
- }
- }
-
- @Test
- public void testCompactionWithSegmentGranularityAndQueryGranularityInGranularitySpec() throws Exception
- {
- try (final Closeable ignored = unloader(fullDatasourceName)) {
- loadData(INDEX_TASK, fullDatasourceName);
- // 4 segments across 2 days
- checkNumberOfSegments(4);
- List expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
- expectedIntervalAfterCompaction.sort(null);
-
- checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
- String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
- queryHelper.testQueriesFromString(queryResponseTemplate);
- compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, GranularityType.YEAR, GranularityType.YEAR);
-
- // The original 4 segments should be compacted into 1 new segment
- checkNumberOfSegments(1);
- queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_YEAR_RESOURCE);
- queryHelper.testQueriesFromString(queryResponseTemplate);
- checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.YEAR.name(), 1);
-
- List newIntervals = new ArrayList<>();
- for (String interval : expectedIntervalAfterCompaction) {
- for (Interval newinterval : GranularityType.YEAR.getDefaultGranularity().getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
- newIntervals.add(newinterval.toString());
- }
- }
- expectedIntervalAfterCompaction = newIntervals;
- checkCompactionIntervals(expectedIntervalAfterCompaction);
- }
- }
-
- @Test
- public void testCompactionWithTimestampDimension() throws Exception
- {
- loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE, COMPACTION_TASK, null);
- }
-
- private void loadDataAndCompact(
- String indexTask,
- String queriesResource,
- String compactionResource,
- GranularityType newSegmentGranularity
- ) throws Exception
- {
- try (final Closeable ignored = unloader(fullDatasourceName)) {
- loadData(indexTask, fullDatasourceName);
- // 4 segments across 2 days
- checkNumberOfSegments(4);
- List expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
- expectedIntervalAfterCompaction.sort(null);
-
- checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
- String queryResponseTemplate = getQueryResponseTemplate(queriesResource);
-
- queryResponseTemplate = StringUtils.replace(
- queryResponseTemplate,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("0")
- );
-
- queryHelper.testQueriesFromString(queryResponseTemplate);
- String taskId = compactData(compactionResource, newSegmentGranularity, null);
-
- // The original 4 segments should be compacted into 2 new segments
- checkNumberOfSegments(2);
- queryHelper.testQueriesFromString(queryResponseTemplate);
- checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 2);
-
- if (newSegmentGranularity != null) {
- List newIntervals = new ArrayList<>();
- for (String interval : expectedIntervalAfterCompaction) {
- for (Interval newinterval : newSegmentGranularity.getDefaultGranularity().getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
- newIntervals.add(newinterval.toString());
- }
- }
- expectedIntervalAfterCompaction = newIntervals;
- }
- checkCompactionIntervals(expectedIntervalAfterCompaction);
-
- Map reports = indexer.getTaskReport(taskId);
- Assert.assertTrue(reports != null && reports.size() > 0);
- }
- }
-
- private String compactData(
- String compactionResource,
- GranularityType newSegmentGranularity,
- GranularityType newQueryGranularity
- ) throws Exception
- {
- String template = getResourceAsString(compactionResource);
- template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName);
- // For the new granularitySpec map
- Map granularityMap = new HashMap<>();
- if (newSegmentGranularity != null) {
- granularityMap.put("segmentGranularity", newSegmentGranularity.name());
- }
- if (newQueryGranularity != null) {
- granularityMap.put("queryGranularity", newQueryGranularity.name());
- }
- template = StringUtils.replace(
- template,
- "%%GRANULARITY_SPEC%%",
- jsonMapper.writeValueAsString(granularityMap)
- );
- // For the deprecated segment granularity field
- if (newSegmentGranularity != null) {
- template = StringUtils.replace(
- template,
- "%%SEGMENT_GRANULARITY%%",
- newSegmentGranularity.name()
- );
- }
- final String taskID = indexer.submitTask(template);
- LOG.info("TaskID for compaction task %s", taskID);
- indexer.waitUntilTaskCompletes(taskID);
-
- ITRetryUtil.retryUntilTrue(
- () -> coordinator.areSegmentsLoaded(fullDatasourceName),
- "Segment Compaction"
- );
-
- return taskID;
- }
-
- private void checkQueryGranularity(String queryResource, String expectedQueryGranularity, int segmentCount) throws Exception
- {
- String queryResponseTemplate;
- try {
- InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryResource);
- queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
- }
- catch (IOException e) {
- throw new ISE(e, "could not read query file: %s", queryResource);
- }
-
- queryResponseTemplate = StringUtils.replace(
- queryResponseTemplate,
- "%%DATASOURCE%%",
- fullDatasourceName
- );
- queryResponseTemplate = StringUtils.replace(
- queryResponseTemplate,
- "%%ANALYSIS_TYPE%%",
- "queryGranularity"
- );
- queryResponseTemplate = StringUtils.replace(
- queryResponseTemplate,
- "%%INTERVALS%%",
- "2013-08-31/2013-09-02"
- );
- List