diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java index 38f0a86223e6..3062a6fa04ba 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java @@ -49,31 +49,31 @@ private TaskResponseObject( this.status = status; } - @SuppressWarnings("unused") // Used by Jackson serialization? + @JsonProperty public String getId() { return id; } - @SuppressWarnings("unused") // Used by Jackson serialization? + @JsonProperty public String getType() { return type; } - @SuppressWarnings("unused") // Used by Jackson serialization? + @JsonProperty public DateTime getCreatedTime() { return createdTime; } - @SuppressWarnings("unused") // Used by Jackson serialization? + @JsonProperty public DateTime getQueueInsertionTime() { return queueInsertionTime; } - @SuppressWarnings("unused") // Used by Jackson serialization? + @JsonProperty public TaskState getStatus() { return status; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java index f8fd41a49fc0..0c88619bc128 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java @@ -31,7 +31,9 @@ import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.clients.OverlordResourceTestClient; import org.apache.druid.testing.clients.TaskResponseObject; +import org.apache.druid.testing.utils.DataLoaderHelper; import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.testing.utils.SqlTestQueryHelper; import org.apache.druid.testing.utils.TestQueryHelper; import org.joda.time.Interval; @@ -62,6 +64,10 @@ public abstract class AbstractIndexerTest protected ObjectMapper smileMapper; @Inject protected TestQueryHelper queryHelper; + @Inject + protected SqlTestQueryHelper sqlQueryHelper; + @Inject + protected DataLoaderHelper dataLoaderHelper; @Inject protected IntegrationTestingConfig config; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index 5ea11e6992cd..06f2841c4da7 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -29,6 +29,7 @@ import org.apache.druid.testing.utils.StreamAdminClient; import org.apache.druid.testing.utils.StreamEventWriter; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.function.Function; @@ -53,6 +54,7 @@ Function generateStreamIngestionPropsTransform( String fullDatasourceName, String parserType, String parserOrInputFormat, + List dimensions, IntegrationTestingConfig config ) { @@ -117,13 +119,16 @@ Function generateStreamIngestionPropsTransform( "%%STREAM_PROPERTIES_KEY%%", "consumerProperties" ); - spec = StringUtils.replace( spec, "%%SCHEMA_REGISTRY_HOST%%", StringUtils.format("http://%s", config.getSchemaRegistryInternalHost()) ); - + spec = StringUtils.replace( + spec, + "%%DIMENSIONS%%", + jsonMapper.writeValueAsString(dimensions) + ); return StringUtils.replace( spec, "%%STREAM_PROPERTIES_VALUE%%", diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java index 5854a6234445..b83ea95ec172 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java @@ -28,6 +28,7 @@ import org.apache.druid.testing.utils.StreamEventWriter; import javax.annotation.Nullable; +import java.util.List; import java.util.function.Function; public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamIndexingTest @@ -59,6 +60,7 @@ Function generateStreamIngestionPropsTransform( String fullDatasourceName, String parserType, String parserOrInputFormat, + List dimensions, IntegrationTestingConfig config ) { @@ -122,6 +124,11 @@ Function generateStreamIngestionPropsTransform( "%%SCHEMA_REGISTRY_HOST%%", StringUtils.format("http://%s", config.getSchemaRegistryInternalHost()) ); + spec = StringUtils.replace( + spec, + "%%DIMENSIONS%%", + jsonMapper.writeValueAsString(dimensions) + ); return StringUtils.replace( spec, "%%STREAM_PROPERTIES_VALUE%%", diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index 5ce872897525..c98d2498033e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -19,6 +19,7 @@ package org.apache.druid.tests.indexer; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.inject.Inject; @@ -73,7 +74,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest // The value to this tag is a timestamp that can be used by a lambda function to remove unused stream. private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after"; private static final int STREAM_SHARD_COUNT = 2; - private static final long CYCLE_PADDING_MS = 100; + protected static final long CYCLE_PADDING_MS = 100; private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json"; private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json"; @@ -93,9 +94,24 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest protected static final String INPUT_FORMAT = "inputFormat"; protected static final String INPUT_ROW_PARSER = "parser"; - private static final String JSON_INPUT_FORMAT_PATH = + protected static final String JSON_INPUT_FORMAT_PATH = String.join("/", DATA_RESOURCE_ROOT, "json", INPUT_FORMAT_SPEC_DIR, "input_format.json"); + protected static final List DEFAULT_DIMENSIONS = ImmutableList.of( + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ); + @Inject private DruidClusterAdminClient druidClusterAdminClient; @@ -117,6 +133,7 @@ abstract Function generateStreamIngestionPropsTransform( String fullDatasourceName, String parserType, String parserOrInputFormat, + List dimensions, IntegrationTestingConfig config ); @@ -625,7 +642,7 @@ private void testIndexWithStreamReshardHelper(@Nullable Boolean transactionEnabl } } - private void verifyIngestedData(GeneratedTestConfig generatedTestConfig, long numWritten) throws Exception + protected void verifyIngestedData(GeneratedTestConfig generatedTestConfig, long numWritten) throws Exception { // Wait for supervisor to consume events LOG.info("Waiting for stream indexing tasks to consume events"); @@ -720,6 +737,11 @@ protected class GeneratedTestConfig private Function streamQueryPropsTransform; public GeneratedTestConfig(String parserType, String parserOrInputFormat) throws Exception + { + this(parserType, parserOrInputFormat, DEFAULT_DIMENSIONS); + } + + public GeneratedTestConfig(String parserType, String parserOrInputFormat, List dimensions) throws Exception { streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID(); String datasource = getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID(); @@ -741,6 +763,7 @@ public GeneratedTestConfig(String parserType, String parserOrInputFormat) throws fullDatasourceName, parserType, parserOrInputFormat, + dimensions, config ); streamQueryPropsTransform = generateStreamQueryPropsTransform(streamName, fullDatasourceName); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java new file mode 100644 index 000000000000..9a9557dc2002 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.testing.clients.TaskResponseObject; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.testing.utils.JsonEventSerializer; +import org.apache.druid.testing.utils.SqlQueryWithResults; +import org.apache.druid.testing.utils.StreamEventWriter; +import org.apache.druid.testing.utils.StreamGenerator; +import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator; +import org.apache.druid.tests.TestNGGroup; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITNilColumnTest extends AbstractKafkaIndexingServiceTest +{ + private static final Logger LOG = new Logger(ITNilColumnTest.class); + private static final String NIL_DIM1 = "nilDim1"; + private static final String NIL_DIM2 = "nilDim2"; + + private final List dimensions; + + public ITNilColumnTest() + { + this.dimensions = new ArrayList<>(DEFAULT_DIMENSIONS.size() + 2); + dimensions.add(NIL_DIM1); + dimensions.addAll(DEFAULT_DIMENSIONS); + dimensions.add(NIL_DIM2); + } + + @Override + public String getTestNamePrefix() + { + return "nil-column-test"; + } + + @BeforeClass + public void beforeClass() throws Exception + { + doBeforeClass(); + } + + @Test + public void testQueryNilColumnBeforeAndAfterPublishingSegments() throws Exception + { + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig( + INPUT_FORMAT, + getResourceAsString(JSON_INPUT_FORMAT_PATH), + dimensions + ); + try ( + final Closeable closer = createResourceCloser(generatedTestConfig); + final StreamEventWriter streamEventWriter = createStreamEventWriter(config, true) + ) { + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform() + .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); + LOG.info("Submitted supervisor"); + // Start generating half of the data + final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator( + new JsonEventSerializer(jsonMapper), + EVENTS_PER_SECOND, + CYCLE_PADDING_MS + ); + long numWritten = streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + TOTAL_NUMBER_OF_SECOND, + FIRST_EVENT_TIME + ); + // Verify supervisor is healthy before suspension + ITRetryUtil.retryUntil( + () -> BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // 60 events should have been ingested as per EVENTS_PER_SECOND and TOTAL_NUMBER_OF_SECOND. + // Since maxRowsInMemory is set to 500,000, every row should be in incrementalIndex. + // So, let's test if SQL finds nil dimensions from incrementalIndexes. + dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName()); + final List queryWithResults = getQueryWithResults(generatedTestConfig); + sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults)); + final List metadataQueryWithResults = getMetadataQueryWithResults(generatedTestConfig); + sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults)); + + // Suspend the supervisor + indexer.terminateSupervisor(generatedTestConfig.getSupervisorId()); + ITRetryUtil.retryUntilTrue( + () -> { + List tasks = indexer + .getRunningTasks() + .stream() + .filter(task -> task.getId().contains(generatedTestConfig.getFullDatasourceName())) + .filter(task -> "index_kafka".equals(task.getType())) + .collect(Collectors.toList()); + LOG.info("[%s] tasks are running", tasks.stream().map(task -> { + try { + return jsonMapper.writeValueAsString(task); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList())); + return tasks.isEmpty(); + }, + "Waiting for all tasks to stop" + ); + + // Now, we should have published all segments. + // Let's test if SQL finds nil dimensions from queryableIndexes. + dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName()); + verifyIngestedData(generatedTestConfig, numWritten); + + sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults)); + sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults)); + } + } + + private static List getQueryWithResults(GeneratedTestConfig generatedTestConfig) + { + return ImmutableList.of( + new SqlQueryWithResults( + new SqlQuery( + StringUtils.format( + "SELECT count(*) FROM \"%s\" WHERE %s IS NOT NULL OR %s IS NOT NULL", + generatedTestConfig.getFullDatasourceName(), + NIL_DIM1, + NIL_DIM2 + ), + null, + false, + false, + false, + null, + null + ), + ImmutableList.of(ImmutableMap.of("EXPR$0", 0)) + ) + ); + } + + private List getMetadataQueryWithResults(GeneratedTestConfig generatedTestConfig) + { + return ImmutableList.of( + new SqlQueryWithResults( + new SqlQuery( + StringUtils.format( + "SELECT COLUMN_NAME, IS_NULLABLE, DATA_TYPE" + + " FROM INFORMATION_SCHEMA.COLUMNS" + + " WHERE TABLE_NAME = '%s' AND COLUMN_NAME IN ('%s', '%s')", + generatedTestConfig.getFullDatasourceName(), + NIL_DIM1, + NIL_DIM2 + ), + null, + false, + false, + false, + null, + null + ), + ImmutableList.of( + ImmutableMap.of( + "COLUMN_NAME", + NIL_DIM1, + "IS_NULLABLE", + "YES", + "DATA_TYPE", + "VARCHAR" + ), + ImmutableMap.of( + "COLUMN_NAME", + NIL_DIM2, + "IS_NULLABLE", + "YES", + "DATA_TYPE", + "VARCHAR" + ) + ) + ) + ); + } +} diff --git a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json index d92116500333..bc668122fa49 100644 --- a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json +++ b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json @@ -8,7 +8,7 @@ "format": "auto" }, "dimensionsSpec": { - "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"], + "dimensions": %%DIMENSIONS%%, "dimensionExclusions": [], "spatialDimensions": [] },